Source code for esi.models

import datetime
import re
import logging

from bravado.client import SwaggerClient
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2.rfc6749.errors import (
    InvalidClientError,
    InvalidClientIdError,
    InvalidGrantError,
    InvalidTokenError,
    MissingTokenError,
)

from django.core.exceptions import ImproperlyConfigured
from django.conf import settings
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _

from . import app_settings
from .clients import esi_client_factory
from .managers import TokenManager
from .errors import (
    IncompleteResponseError,
    NotRefreshableTokenError,
    TokenExpiredError,
    TokenInvalidError,
    TokenError
)


logger = logging.getLogger(__name__)


class Scope(models.Model):
    """
    Represents an access scope granted by SSO.
    """
    name = models.CharField(
        max_length=100, unique=True, help_text="The official EVE name for the scope."
    )
    help_text = models.TextField(help_text="The official EVE description of the scope.")

    @property
    def friendly_name(self):
        return self._friendly_name(self.name)

    @classmethod
    def _friendly_name(cls, name):
        try:
            return re.sub('_', ' ', name.split('.')[1]).strip()
        except IndexError:
            return name

    def __str__(self):
        return self.name


[docs] class Token(models.Model): """EVE Swagger Interface Access Token Contains information about the authenticating character and scopes granted to this token. Contains the access token required for ESI authentication as well as refreshing. """ TOKEN_TYPE_CHARACTER = "character" TOKEN_TYPE_CORPORATION = "corporation" TOKEN_TYPE_CHOICES = [ (TOKEN_TYPE_CHARACTER, _('Character')), (TOKEN_TYPE_CORPORATION, _('Corporation')), ] created = models.DateTimeField(auto_now_add=True) access_token = models.TextField( help_text="The access token granted by SSO.", editable=False ) refresh_token = models.TextField( null=True, # refresh tokens returned from SSO can be null help_text="A re-usable token to generate new access tokens upon expiry.", editable=False ) user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, blank=True, null=True, help_text="The user to whom this token belongs." ) character_id = models.IntegerField( db_index=True, help_text="The ID of the EVE character who authenticated by SSO." ) character_name = models.CharField( max_length=100, db_index=True, help_text="The name of the EVE character who authenticated by SSO." ) token_type = models.CharField( max_length=100, choices=TOKEN_TYPE_CHOICES, default=TOKEN_TYPE_CHARACTER, help_text="The applicable range of the token." ) character_owner_hash = models.CharField( max_length=254, db_index=True, help_text=( "The unique string identifying this character and its owning EVE " "account. Changes if the owning account changes." ) ) scopes = models.ManyToManyField( Scope, blank=True, help_text="The access scopes granted by this token." ) sso_version = models.IntegerField( help_text="EVE SSO Version.", default=2 ) objects = TokenManager() def __str__(self): try: scopes = sorted(s.name for s in self.scopes.all()) except ValueError: scopes = [] return f'{self.character_name} - {", ".join(scopes)}' def __repr__(self): return "<{}(id={}): {}, {}>".format( self.__class__.__name__, self.pk, self.character_id, self.character_name, ) @property def can_refresh(self) -> bool: """Determine if this token can be refreshed upon expiry.""" return bool(self.refresh_token) @property def expires(self) -> datetime.datetime: """Determines when this token expires. Returns: Date & time when this token expires """ return ( self.created + datetime.timedelta(seconds=app_settings.ESI_TOKEN_VALID_DURATION) ) @property def expired(self) -> bool: """Determines if this token has expired.""" return self.expires < timezone.now()
[docs] def valid_access_token(self) -> str: """Refresh and return access token to be used in an authed ESI call. Example: .. code-block:: python # fetch medals for a character medals = esi.client.Character.get_characters_character_id_medals( # required parameter for endpoint character_id = token.character_id, # provide a valid access token, which will be refreshed if required token = token.valid_access_token() ).results() Returns: Valid access token Raises: TokenExpiredError: When token can not be refreshed """ if self.expired: if self.can_refresh: self.refresh() else: raise TokenExpiredError() return self.access_token
[docs] def refresh( self, session: OAuth2Session = None, auth: HTTPBasicAuth = None ) -> None: """Refresh this token. Args: session: session for refreshing token with auth: ESI authentication """ logger.debug("Attempting refresh of %r", self) if self.can_refresh: if not session: session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID) if not auth: auth = HTTPBasicAuth( app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET ) try: token = session.refresh_token( app_settings.ESI_TOKEN_URL, refresh_token=self.refresh_token, auth=auth ) logger.debug("Retrieved new token from SSO servers.") # logger.debug(token) token_data = TokenManager.validate_access_token(token['access_token']) # TODO verify token properly if token_data is not None: if self.character_owner_hash != token_data['owner']: logger.warning("Invalid Owner") raise InvalidTokenError("Ownership Changed! Revoke me!") self.access_token = token['access_token'] self.refresh_token = token['refresh_token'] self.sso_version = 2 # we will never be ssov1 again self.created = timezone.now() self.save() logger.debug("Successfully refreshed %r", self) except (InvalidGrantError) as e: # this token is gone forever logger.error("Refresh impossible for %r: %r", self, e) raise TokenInvalidError() except (InvalidTokenError, InvalidClientIdError) as e: # these may be recoverable? logger.warning("Refresh failed for %r: %r", self, e) raise TokenInvalidError() except MissingTokenError as e: logger.info("Refresh failed for %r: %r", self, e) raise IncompleteResponseError() except InvalidClientError: logger.debug( "ESI client ID and secret rejected by remote. Cannot refresh." ) raise ImproperlyConfigured( 'Verify ESI_SSO_CLIENT_ID and ESI_SSO_CLIENT_SECRET settings.' ) else: logger.debug("Not a refreshable token.") raise NotRefreshableTokenError()
[docs] def refresh_or_delete(self): """Refresh this token or delete it if it can not be refreshed.""" try: self.refresh() except TokenError: self.delete() logger.warning("%s: Refresh failed. Token deleted.", repr(self)) else: logging.info("%s: Successfully refreshed", self)
[docs] def get_esi_client(self, **kwargs) -> SwaggerClient: """Creates an authenticated ESI client with this token. Args: **kwargs: Extra spec versioning as per \ :class:`esi.clients.esi_client_factory` Returns: New ESI client """ return esi_client_factory(token=self, **kwargs)
@classmethod def get_token_data(cls, access_token): return TokenManager.validate_access_token(access_token) # unused? def update_token_data(self, commit=True): logger.debug("Updating token data for %r", self) if self.expired: if self.can_refresh: self.refresh() else: raise TokenExpiredError() token_data = self.get_token_data(self.access_token) logger.debug(token_data) self.character_id = token_data['character_id'] self.character_name = token_data['name'] self.character_owner_hash = token_data['owner'] self.token_type = token_data['token_type'] logger.debug("Successfully updated token data.") if commit: self.save()
[docs] @classmethod def get_token(cls, character_id: int, scopes: list) -> "Token": """Helper method to get a token for a specific character with specific scopes. Args: character_id: Character to filter on. scopes: array of ESI scope strings to search for. Returns: Matching token or `False` when token is not found """ token = ( Token.objects .filter(character_id=character_id) .require_scopes(scopes) .first() ) if token: return token else: return False
class CallbackRedirect(models.Model): """ Records the intended destination for the SSO callback. Used to internally redirect SSO callbacks. """ session_key = models.CharField( max_length=254, unique=True, help_text="Session key identifying the session this redirect was created for." ) url = models.TextField( default="/", help_text="The internal URL to redirect this callback towards." ) state = models.CharField( max_length=128, help_text="OAuth2 state string representing this session." ) created = models.DateTimeField(auto_now_add=True) token = models.ForeignKey( Token, on_delete=models.CASCADE, blank=True, null=True, help_text=( "Token generated by a completed code exchange " "from callback processing." ) ) def __str__(self): return f"{self.session_key}: {self.url}" def __repr__(self): return "<{}(pk={}): {} to {}>".format( self.__class__.__name__, self.pk, self.session_key, self.url )