Source code for esi.clients

from datetime import datetime
from hashlib import md5
import json
import logging
from time import sleep
from urllib import parse as urlparse
from typing import Any, Union, Tuple

from bravado.client import SwaggerClient
from bravado import requests_client
from bravado.exception import (
    HTTPBadGateway, HTTPGatewayTimeout, HTTPServiceUnavailable
)
from bravado_core.response import IncomingResponse
from bravado.swagger_model import Loader
from bravado.http_future import HttpFuture
from bravado_core.spec import Spec, CONFIG_DEFAULTS
from requests.adapters import HTTPAdapter

from django.core.cache import cache

from .errors import TokenExpiredError
from . import app_settings, __version__, __title__


logger = logging.getLogger(__name__)

_LIBRARIES_LOG_LEVEL = logging.getLevelName(app_settings.ESI_LOG_LEVEL_LIBRARIES)
logging.getLogger('swagger_spec_validator').setLevel(_LIBRARIES_LOG_LEVEL)
logging.getLogger('bravado_core').setLevel(_LIBRARIES_LOG_LEVEL)
logging.getLogger('urllib3').setLevel(_LIBRARIES_LOG_LEVEL)
logging.getLogger('bravado').setLevel(_LIBRARIES_LOG_LEVEL)

SPEC_CONFIG = {'use_models': False}
RETRY_SLEEP_SECS = 1


[docs] class CachingHttpFuture(HttpFuture): """Extended wrapper for a FutureAdapter that returns a HTTP response and also supports caching. This class contains the response for an ESI request with an ESI client. """ def _cache_key(self) -> str: """Generate the key name used to cache responses.""" request = self.future.request data = ( request.method + request.url + str(request.params) + str(request.data) + str(request.json) ).encode('utf-8') # The following hash is not used in any security context. It is only used # to generate unique values, collisions are acceptable and "data" is not # coming from user-generated input str_hash = md5(data).hexdigest() # nosec B303, B303-1 return f'esi_{str_hash}' @staticmethod def _time_to_expiry(expires): """Determine the seconds until a HTTP header "Expires" timestamp. Args: expires: HTTP response "Expires" header Returns: seconds until "Expires" time """ try: expires_dt = datetime.strptime(str(expires), '%a, %d %b %Y %H:%M:%S %Z') delta = expires_dt - datetime.utcnow() return delta.total_seconds() except ValueError: return 0
[docs] def results(self, **kwargs) -> Union[Any, Tuple[Any, IncomingResponse]]: """Executes the request and returns the response from ESI for the current route. Response will include all pages if there are more available. Accepts same parameters in ``kwargs`` as :meth:`result` Returns: same as :meth:`result`, but for multiple pages """ results = list() headers = None # preserve original value _also_return_response = self.request_config.also_return_response # override to always get the raw response for expiry header self.request_config.also_return_response = True if "page" in self.operation.params: current_page = 1 total_pages = 1 # loop all pages and add data to output array while current_page <= total_pages: self.future.request.params["page"] = current_page # will use cache if applicable result, headers = self.result(**kwargs) total_pages = int(headers.headers['X-Pages']) # append to results list to be seamless to the client results += result current_page += 1 else: # it doesn't so just return results, headers = self.result(**kwargs) # restore original value self.request_config.also_return_response = _also_return_response # obey the output if self.request_config.also_return_response: return results, headers else: return results
[docs] def results_localized(self, languages: list = None, **kwargs) -> dict: """Executes the request and returns the response from ESI for all default languages and pages (if any). Accepts same parameters in ``kwargs`` as :meth:`result` plus ``languages`` Args: languages: (optional) list of languages to return \ instead of default languages Returns: Dict of all responses with the language code as keys. """ if not languages: my_languages = list(app_settings.ESI_LANGUAGES) else: my_languages = [] for lang in dict.fromkeys(languages): if lang not in app_settings.ESI_LANGUAGES: raise ValueError('Invalid language code: %s' % lang) my_languages.append(lang) return { language: self.results(language=language, **kwargs) for language in my_languages }
[docs] def result(self, **kwargs) -> Union[Any, Tuple[Any, IncomingResponse]]: """Executes the request and returns the response from ESI. Response will include the requested / first page only if there are more pages available. Args: timeout: (optional) timeout for ESI request in seconds, overwrites default retries: (optional) max number of retries, overwrites default language: (optional) retrieve result for specific language ignore_cache: (optional) set to ``True`` to ignore response caching Returns: Response from endpoint or a tuple with response from endpoint \ and an incoming response object containing additional meta data \ including the HTTP response headers """ if 'language' in kwargs.keys(): # this parameter is not supported by bravado, so we can't pass it on self.future.request.params['language'] = str(kwargs.pop('language')) if 'timeout' not in kwargs: kwargs['timeout'] = ( app_settings.ESI_REQUESTS_CONNECT_TIMEOUT, app_settings.ESI_REQUESTS_READ_TIMEOUT ) ignore_cache = ( kwargs.pop('ignore_cache') if 'ignore_cache' in kwargs.keys() else False ) if ( app_settings.ESI_CACHE_RESPONSE and not ignore_cache and self.future.request.method == 'GET' and self.operation is not None ): result = None response = None cache_key = self._cache_key() try: cached = cache.get(cache_key) except Exception: cached = None logger.warning( "Attempt to read ESI results from cache failed", exc_info=True ) if cached: result, response = cached expiry = self._time_to_expiry(str(response.headers.get('Expires'))) if expiry < 0: logger.warning( "cache expired by %d seconds, Forcing expiry", expiry ) cached = False if not cached: result, response = self._result_with_retries(**kwargs) if response and 'Expires' in response.headers: expires = self._time_to_expiry(response.headers['Expires']) if expires > 0: try: cache.set(cache_key, (result, response), expires) except Exception: logger.warning( "Failed to write ESI result to cache", exc_info=True ) if self.request_config.also_return_response: return result, response return result elif self.operation is not None: result, response = self._result_with_retries(**kwargs) if self.request_config.also_return_response: return result, response return result return super().result(**kwargs)
def _result_with_retries(self, **kwargs) -> Tuple[Any, IncomingResponse]: """Execute request and retry on certain HTTP errors. ``kwargs`` are passed through to super().result() Returns: Tuple with response from endpoint and an incoming response object \ containing additional meta data including the HTTP response headers """ # preserve original value _also_return_response = self.request_config.also_return_response # override to always get the raw response for expiry header self.request_config.also_return_response = True if 'retries' in kwargs.keys(): max_retries = int(kwargs.pop('retries')) else: max_retries = int(app_settings.ESI_SERVER_ERROR_MAX_RETRIES) max_retries = max(0, max_retries) retries = 0 while retries <= max_retries: try: if app_settings.ESI_INFO_LOGGING_ENABLED: params = self.future.request.params logger.info( 'Fetching from ESI: %s%s%s', self.future.request.url, f' - language {params["language"]}' if 'language' in params else '', f' - page {params["page"]}' if 'page' in params else '' ) logger.debug( 'ESI request: %s - %s', self.future.request.url, self.future.request.params ) logger.debug('ESI request headers: %s', self.future.request.headers) result, response = super().result(**kwargs) logger.debug('ESI response status code: %s', response.status_code) logger.debug('ESI response headers: %s', response.headers) if app_settings.ESI_DEBUG_RESPONSE_CONTENT_LOGGING: logger.debug('ESI response content: %s', response.text) break except (HTTPBadGateway, HTTPGatewayTimeout, HTTPServiceUnavailable) as ex: if retries < max_retries: retries += 1 logger.warning( "ESI error - %s %s - Retry: %d/%d", self.future.request.url, ex.status_code, retries, max_retries ) wait_secs = ( app_settings.ESI_SERVER_ERROR_BACKOFF_FACTOR * (2 ** (retries - 1)) ) sleep(wait_secs) else: raise ex # restore original value self.request_config.also_return_response = _also_return_response return result, response
requests_client.HttpFuture = CachingHttpFuture class TokenAuthenticator(requests_client.Authenticator): """ Adds the authorization header containing access token, if specified. Sets ESI datasource to tranquility or singularity. """ def __init__(self, token=None, datasource=None): host = urlparse.urlsplit(app_settings.ESI_API_URL).hostname super().__init__(host) self.token = token self.datasource = datasource def apply(self, request): if self.token and self.token.expired: if self.token.can_refresh: self.token.refresh() else: raise TokenExpiredError() request.headers['Authorization'] = \ 'Bearer ' + self.token.access_token if self.token else None request.params['datasource'] = \ self.datasource or app_settings.ESI_API_DATASOURCE return request class RequestsClientPlus(requests_client.RequestsClient): """RequestsClient with ability to set the user agent header for all requests""" def __init__( self, ssl_verify=True, ssl_cert=None, future_adapter_class=requests_client.RequestsFutureAdapter, response_adapter_class=requests_client.RequestsResponseAdapter, ): super().__init__( ssl_verify, ssl_cert, future_adapter_class, response_adapter_class ) self.user_agent = None def request( self, request_params, operation=None, request_config=None ) -> HttpFuture: if self.user_agent: current_headers = request_params.get("headers", dict()) new_header = {"User-Agent": str(self.user_agent)} request_params["headers"] = {**current_headers, **new_header} return super().request(request_params, operation, request_config) def build_cache_name(name): """ Cache key name formatter :param name: Name of the spec dict to cache, usually version :return: String name for cache key :rtype: str """ return 'esi_swaggerspec_%s' % name def cache_spec(name, spec): """ Cache the spec dict :param name: Version name :param spec: Spec dict :return: True if cached """ return cache.set( build_cache_name(name), spec, app_settings.ESI_SPEC_CACHE_DURATION ) def build_spec_url(spec_version): """ Generates the URL to swagger.json for the ESI version :param spec_version: Name of the swagger spec version, like latest or v4 :return: URL to swagger.json for the requested spec version """ return urlparse.urljoin(app_settings.ESI_API_URL, spec_version + '/swagger.json') def get_spec(name, http_client=None, config=None): """ :param name: Name of the revision of spec, eg latest or v4 :param http_client: Requests client used for retrieving specs :param config: Spec configuration - see Spec.CONFIG_DEFAULTS :return: :class:`bravado_core.spec.Spec` """ http_client = http_client or requests_client.RequestsClient() def load_spec(): loader = Loader(http_client) return loader.load_spec(build_spec_url(name)) spec_dict = cache.get_or_set( build_cache_name(name), load_spec, app_settings.ESI_SPEC_CACHE_DURATION ) config = dict(CONFIG_DEFAULTS, **(config or {})) return Spec.from_dict(spec_dict, build_spec_url(name), http_client, config) def build_spec(base_version, http_client=None, **kwargs): """ Generates the Spec used to initialize a SwaggerClient, supporting mixed resource versions :param http_client: :class:`bravado.requests_client.RequestsClient` :param base_version: Version to base the spec on. Any resource without an explicit version will be this. :param kwargs: Explicit resource versions, by name (eg Character='v4') :return: :class:`bravado_core.spec.Spec` """ base_spec = get_spec(base_version, http_client=http_client, config=SPEC_CONFIG) if kwargs: for resource, resource_version in kwargs.items(): versioned_spec = get_spec( resource_version, http_client=http_client, config=SPEC_CONFIG ) try: spec_resource = versioned_spec.resources[resource.capitalize()] except KeyError: raise AttributeError( 'Resource {} not found on API revision {}'.format( resource, resource_version ) ) base_spec.resources[resource.capitalize()] = spec_resource return base_spec def read_spec(path, http_client=None): """ Reads in a swagger spec file used to initialize a SwaggerClient :param path: String path to local swagger spec file. :param http_client: :class:`bravado.requests_client.RequestsClient` :return: :class:`bravado_core.spec.Spec` """ with open(path, encoding='utf-8') as f: spec_dict = json.loads(f.read()) return SwaggerClient.from_spec( spec_dict, http_client=http_client, config=SPEC_CONFIG )
[docs] def esi_client_factory( token=None, datasource: str = None, spec_file: str = None, version: str = None, app_info_text: str = None, **kwargs ) -> SwaggerClient: """Generate a new ESI client. Args: token(esi.models.Token): used to access authenticated endpoints. datasource: Name of the ESI datasource to access. spec_file: Absolute path to a swagger spec file to load. version: Base ESI API version. Accepted values are 'legacy', 'latest', app_info_text: Text identifying the application using ESI which will be \ included in the User-Agent header. Should contain name and version of the \ application using ESI. e.g. `"my-app v1.0.0"`. \ Note that spaces are used as delimiter. kwargs: Explicit resource versions to build, in the form Character='v4'. \ Same values accepted as version. If a spec_file is specified, specific versioning is not available. Meaning the version and resource version kwargs are ignored in favour of the versions available in the spec_file. Returns: New ESI client """ if app_settings.ESI_INFO_LOGGING_ENABLED: logger.info('Generating an ESI client...') client = RequestsClientPlus() user_agent = ( str(app_info_text) if app_info_text else f"{__title__} v{__version__}" ) if app_settings.ESI_USER_CONTACT_EMAIL: user_agent += f" {app_settings.ESI_USER_CONTACT_EMAIL}" client.user_agent = user_agent my_http_adapter = HTTPAdapter( pool_maxsize=app_settings.ESI_CONNECTION_POOL_MAXSIZE, max_retries=app_settings.ESI_CONNECTION_ERROR_MAX_RETRIES ) client.session.mount('https://', my_http_adapter) if token or datasource: client.authenticator = TokenAuthenticator(token=token, datasource=datasource) api_version = version or app_settings.ESI_API_VERSION if spec_file: return read_spec(spec_file, http_client=client) else: spec = build_spec(api_version, http_client=client, **kwargs) return SwaggerClient(spec)
def minimize_spec(spec_dict, operations=None, resources=None): """ Trims down a source spec dict to only the operations or resources indicated. :param spec_dict: The source spec dict to minimize. :type spec_dict: dict :param operations: A list of operation IDs to retain. :type operations: list of str :param resources: A list of resource names to retain. :type resources: list of str :return: Minimized swagger spec dict :rtype: dict """ operations = operations or [] resources = resources or [] # keep the ugly overhead for now but only add paths we need minimized = {key: value for key, value in spec_dict.items() if key != 'paths'} minimized['paths'] = {} for path_name, path in spec_dict['paths'].items(): for method, data in path.items(): if ( data['operationId'] in operations or any(tag in resources for tag in data['tags']) ): if path_name not in minimized['paths']: minimized['paths'][path_name] = {} minimized['paths'][path_name][method] = data return minimized
[docs] class EsiClientProvider: """Class for providing a single ESI client instance for the whole app Args: datasource: Name of the ESI datasource to access. spec_file: Absolute path to a swagger spec file to load. version: Base ESI API version. Accepted values are 'legacy', 'latest', app_info_text: Text identifying the application using ESI which will be \ included in the User-Agent header. Should contain name and version of \ the application using ESI. e.g. `"my-app v1.0.0"`. \ Note that spaces are used as delimiter. kwargs: Explicit resource versions to build, in the form Character='v4'. \ Same values accepted as version. If a spec_file is specified, specific versioning is not available. Meaning the version and resource version kwargs are ignored in favour of the versions available in the spec_file. """ _client = None def __init__( self, datasource=None, spec_file=None, version=None, app_info_text=None, **kwargs ): self._datasource = datasource self._spec_file = spec_file self._version = version self._app_text = app_info_text self._kwargs = kwargs @property def client(self): if self._client is None: self._client = esi_client_factory( datasource=self._datasource, spec_file=self._spec_file, version=self._version, app_info_text=self._app_text, **self._kwargs, ) return self._client def __str__(self): return 'EsiClientProvider'