import logging
import pathlib
import warnings
from datetime import date, datetime, timedelta, timezone
from hashlib import md5
from timeit import default_timer
from typing import Any
from aiopenapi3 import FileSystemLoader, OpenAPI
from aiopenapi3._types import ResponseDataType, ResponseHeadersType
from aiopenapi3.errors import (
HTTPClientError as base_HTTPClientError,
HTTPServerError as base_HTTPServerError,
)
from aiopenapi3.request import OperationIndex, RequestBase
from httpx import (
AsyncClient, Client, HTTPStatusError, Limits, RequestError, Response,
Timeout,
)
from tenacity import (
AsyncRetrying, Retrying, retry_if_exception, stop_after_attempt,
wait_combine, wait_exponential,
)
from django.conf import settings
from django.core.cache import cache
from django.utils.text import slugify
from esi import app_settings
from esi.aiopenapi3.client import SpecCachingClient
from esi.aiopenapi3.plugins import (
Add304ContentType, DjangoESIInit, MinifySpec, PatchCompatibilityDatePlugin,
Trim204ContentType,
)
from esi.exceptions import (
ESIErrorLimitException, HTTPClientError, HTTPNotModified, HTTPServerError,
)
from esi.models import Token
from esi.rate_limiting import (
ESIRateLimitBucket, ESIRateLimits, interval_to_seconds,
)
from esi.signals import esi_request_statistics
from esi.stubs import ESIClientStub
from . import __title__, __url__, __version__
from .helpers import pascal_case_string
logger = logging.getLogger(__name__)
ETAG_EXPIRY = 60 * 60 * 24 * 7 # 7 days
MAX_CACHE_TIME = 60 * 60 * 24 # 24h
def _time_to_expiry(expires_header: str) -> int:
"""Calculate cache TTL from Expires header
Args:
expires_header (str): The value of the Expires header '%a, %d %b %Y %H:%M:%S %Z'
Returns:
int: The cache TTL in seconds
"""
try:
expires_dt = datetime.strptime(str(expires_header), '%a, %d %b %Y %H:%M:%S %Z')
if expires_dt.tzinfo is None:
expires_dt = expires_dt.replace(tzinfo=timezone.utc)
return max(int((expires_dt - datetime.now(timezone.utc)).total_seconds()), 0)
except ValueError:
return 0
def _unpack_cache_control(headers: dict[str, Any]) -> int:
"""Calculate cache TTL from Cache-Control header,
Falling back to Expires header if no max-age is set
https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/Caching#expires_or_max-age
Args:
headers (dict): request headers to generate ttl for cache
Returns:
int: The cache TTL in seconds up to max cache time.
The value of the Cache-Control header, 'private, max-age=#####, immutable'
The value of the date header, '%a, %d %b %Y %H:%M:%S %Z'
"""
_date = False
_expires = 0
if "date" in headers:
date_format = "%a, %d %b %Y %H:%M:%S %Z"
try:
_date = datetime.strptime(headers.get("date"), date_format)
_date = _date.replace(tzinfo=timezone.utc)
except ValueError as e:
logger.warning(f"Error converting date string: {e}")
if "cache-control" in headers:
try:
_header = headers.get("cache-control").split(",")
_sections = {}
_expires = 0
for sec in _header:
if "=" in sec:
_cont = sec.strip().split("=")
_sections[_cont[0]] = _cont[1]
if "max-age" in _sections:
_max_age = min(MAX_CACHE_TIME, int(_sections.get("max-age", 0)))
if _date:
# Calculate expiry from date of request + max age
_expire_date = _date + timedelta(seconds=_max_age)
_expire_time: timedelta = _expire_date - datetime.now(timezone.utc)
_expires = int(_expire_time.total_seconds())
else:
# Date header failed nbd, so just use max-age as the ttl
_expires = _max_age
elif "no-store" in _sections:
_expires = 0
# elif "no-cache": is intentionally missing here.
# no-cache is mostly delegated off to E-Tags that are handled elsewhere.
# no-cache endpoints **can have** short HTTP caches 60 seconds, so why not keep
else:
# Cache Control exists, but no max-age is defined, fall back to legacy Expires header
_expires = _time_to_expiry(str(headers.get('Expires')))
except ValueError as e:
logger.warning(f"Error converting date strings: {e}")
return 0
return max(_expires, 0)
return 0 # please only call this function if cache-control header exists
def _httpx_exceptions(exc: BaseException) -> bool:
"""
Helper function for HTTP Retries, what various exceptions and status codes should we retry on.
ESI has some weird behaviours
"""
if isinstance(exc, ESIErrorLimitException):
return False
if isinstance(exc, RequestError):
return True
if isinstance(exc, HTTPStatusError) and getattr(exc.response, "status_code", None) in {502, 503, 504}:
return True
return False
def http_retry_sync() -> Retrying:
return Retrying(
retry=retry_if_exception(_httpx_exceptions),
wait=wait_combine(
wait_exponential(multiplier=app_settings.ESI_SERVER_ERROR_WAIT_EXPONENT, min=1, max=10),
),
stop=stop_after_attempt(app_settings.ESI_SERVER_ERROR_MAX_RETRIES),
reraise=True,
)
async def http_retry_async() -> AsyncRetrying: # pragma: no cover
return AsyncRetrying(
retry=retry_if_exception(_httpx_exceptions),
wait=wait_combine(
wait_exponential(multiplier=app_settings.ESI_SERVER_ERROR_WAIT_EXPONENT, min=1, max=10),
),
stop=stop_after_attempt(app_settings.ESI_SERVER_ERROR_MAX_RETRIES),
reraise=True,
)
def _load_plugins(app_name, tags: list[str] = [], operations: list[str] = []):
"""Load the plugins to make ESI work with this lib.
Args:
app_name (str): app name to use for internal ETag
"""
return [
PatchCompatibilityDatePlugin(),
Trim204ContentType(),
Add304ContentType(),
DjangoESIInit(app_name),
MinifySpec(tags, operations)
]
def _load_aiopenapi_client_sync(
spec_url: str,
compatibility_date: str,
app_name: str,
user_agent: str,
tenant: str,
spec_file: str | None = None,
tags: list[str] = [],
operations: list[str] = [],
additional_spec_headers: dict = {}) -> OpenAPI:
"""Create an OpenAPI3 Client from Spec
Args:
spec_url (str): _description_
compatibility_date (str): _description_
app_name (str): _description_
user_agent (str): _description_
tenant (str): _description_
spec_file (str | None, optional): _description_. Defaults to None.
Returns:
OpenAPI: aiopenapi3 Client Class
"""
headers = {
"User-Agent": user_agent,
"X-Tenant": tenant,
"X-Compatibility-Date": compatibility_date,
**additional_spec_headers
}
def session_factory(**kwargs) -> Client:
kwargs.pop("headers", None)
return SpecCachingClient(
headers=headers,
timeout=Timeout(
connect=app_settings.ESI_REQUESTS_CONNECT_TIMEOUT,
read=app_settings.ESI_REQUESTS_READ_TIMEOUT,
write=app_settings.ESI_REQUESTS_WRITE_TIMEOUT,
pool=app_settings.ESI_REQUESTS_POOL_TIMEOUT
),
limits=Limits(
max_connections=app_settings.ESI_CONNECTION_POOL_MAX_CONNECTIONS,
max_keepalive_connections=app_settings.ESI_CONNECTION_POOL_MAX_KEEPALIVE,
keepalive_expiry=app_settings.ESI_CONNECTION_POOL_KEEPALIVE_EXPIRY
),
http2=True,
**kwargs
)
if spec_file:
return OpenAPI.load_file(
url=spec_url,
path=spec_file,
session_factory=session_factory,
loader=FileSystemLoader(pathlib.Path(spec_file)),
use_operation_tags=True,
plugins=_load_plugins(app_name, tags, operations)
)
else:
return OpenAPI.load_sync(
url=spec_url,
session_factory=session_factory,
use_operation_tags=True,
plugins=_load_plugins(app_name, tags, operations)
)
async def _load_aiopenapi_client_async(
spec_url: str,
compatibility_date: str,
app_name: str,
user_agent: str,
tenant: str,
spec_file: str | None = None) -> OpenAPI: # pragma: no cover
"""Create an OpenAPI3 Client from Spec Async
Args:
spec_url (str): _description_
compatibility_date (str): _description_
user_agent (str): _description_
tenant (str): _description_
spec_file (str | None, optional): _description_. Defaults to None.
Returns:
OpenAPI: aiopenapi3 Client Class
"""
headers = {
"User-Agent": user_agent,
"X-Tenant": tenant,
"X-Compatibility-Date": compatibility_date
}
def session_factory(**kwargs) -> AsyncClient:
kwargs.pop("headers", None)
return AsyncClient(
headers=headers,
timeout=Timeout(
connect=app_settings.ESI_REQUESTS_CONNECT_TIMEOUT,
read=app_settings.ESI_REQUESTS_READ_TIMEOUT,
write=app_settings.ESI_REQUESTS_WRITE_TIMEOUT,
pool=app_settings.ESI_REQUESTS_POOL_TIMEOUT
),
limits=Limits(
max_connections=app_settings.ESI_CONNECTION_POOL_MAX_CONNECTIONS,
max_keepalive_connections=app_settings.ESI_CONNECTION_POOL_MAX_KEEPALIVE,
keepalive_expiry=app_settings.ESI_CONNECTION_POOL_KEEPALIVE_EXPIRY
),
http2=True,
**kwargs
)
if spec_file:
# TODO find a async way to load from file?
return OpenAPI.load_file(
url=spec_url,
path=spec_file,
session_factory=session_factory,
use_operation_tags=True,
plugins=_load_plugins(app_name)
)
else:
return await OpenAPI.load_async(
url=spec_url,
session_factory=session_factory,
use_operation_tags=True,
plugins=_load_plugins(app_name)
)
def _build_user_agent(ua_appname: str, ua_version: str, ua_url: str | None = None) -> str:
"""
AppName/1.2.3 (foo@example.com; +https://gitlab.com/) Django-ESI/1.2.3 (+https://gitlab.com/allianceauth/django-esi)
Contact Email will be inserted from app_settings.
Args:
ua_appname (str): Application Name, PascalCase
ua_version (str): Application Version, SemVer
ua_url (str | None): Application URL (Optional)
Returns:
str: User-Agent string
"""
# Enforce PascalCase for `ua_appname` and strip whitespace
sanitized_ua_appname = pascal_case_string(ua_appname)
sanitized_appname = pascal_case_string(__title__)
return (
f"{sanitized_ua_appname}/{ua_version} "
f"({app_settings.ESI_USER_CONTACT_EMAIL}{f'; +{ua_url})' if ua_url else ')'} "
f"{sanitized_appname}/{__version__} (+{__url__})"
)
def _get_spec_url() -> str:
return f"{app_settings.ESI_API_URL}meta/openapi.json"
def esi_client_factory_sync(
compatibility_date: str,
ua_appname: str, ua_version: str, ua_url: str | None = None,
spec_file: str | None = None,
tenant: str = "tranquility",
tags: list[str] = [],
operations: list[str] = [],
additional_spec_headers: dict = {},
**kwargs) -> OpenAPI:
"""Generate a new OpenAPI ESI client.
Args:
compatibility_date (str): "YYYY-MM-DD" The latest version of ESI your client is tested with
ua_appname (str): Application Name, PascalCase
ua_version (str): Application Version, SemVer
ua_url (str, optional): Application URL (Optional). Defaults to None.
spec_file (str | None, optional): Specification file path (Optional). Defaults to None.
tenant (str, optional): Tenant ID (Optional). Defaults to "tranquility".
Returns:
OpenAPI: OpenAPI ESI Client
"""
user_agent = _build_user_agent(ua_appname, ua_version, ua_url)
spec_url = _get_spec_url()
return _load_aiopenapi_client_sync(
spec_url,
compatibility_date,
ua_appname,
user_agent,
tenant,
spec_file,
tags,
operations,
additional_spec_headers
)
async def esi_client_factory_async(
compatibility_date: str,
ua_appname: str, ua_version: str, ua_url: str | None = None,
spec_file: str | None = None,
tenant: str = "tranquility",
**kwargs) -> OpenAPI: # pragma: no cover
"""Generate a new OpenAPI ESI client.
Args:
compatibility_date (str): "YYYY-MM-DD" The latest version of ESI your client is tested with
ua_appname (str): Application Name, PascalCase
ua_version (str): Application Version, SemVer
ua_url (str | None, optional): Application URL (Optional). Defaults to None.
spec_file (str | None, optional): Specification file path (Optional). Defaults to None.
tenant (str, optional): Tenant ID (Optional). Defaults to "tranquility".
Returns:
OpenAPI: OpenAPI ESI Client
"""
user_agent = _build_user_agent(ua_appname, ua_version, ua_url)
spec_url = _get_spec_url()
return await _load_aiopenapi_client_async(spec_url, compatibility_date, ua_appname, user_agent, tenant, spec_file)
class BaseEsiOperation():
def __init__(self, operation, api: OpenAPI) -> None:
self.method, self.url, self.operation, self.extra = operation
self.api = api
self.token: Token | None = None
self.bucket: ESIRateLimitBucket | None = None
self._args = []
self._kwargs = {}
self._set_bucket()
def __call__(self, *args, **kwargs) -> "BaseEsiOperation":
self._args = args
self._kwargs = kwargs
return self
def _unnormalize_parameters(self, params: dict[str, Any]) -> dict[str, Any]:
"""UN-Normalize Pythonic parameter names back to OpenAPI names.
Converts pythonic keys like "Accept_Language" to "Accept-Language" when/if
a non pythonic (usually) hyphenated form exists in the operation's parameter list. Performs
case-insensitive matching and only rewrites when there's a known
parameter with hyphens, leaving normal snake_case params (e.g.
"type_id") untouched.
Args:
params: Raw parameters collected from the call
Returns:
dict: Parameters with keys aligned to the OpenAPI spec
"""
try:
spec_param_names = [p.name for p in getattr(self.operation, "parameters", [])]
except Exception:
spec_param_names = []
# Exact and case-insensitive lookup maps
spec_param_set = set(spec_param_names)
spec_param_map_ci = {n.lower(): n for n in spec_param_names}
normalized: dict[str, Any] = {}
for k, v in params.items():
# Fast path: exact match
if k in spec_param_set:
normalized[k] = v
continue
# Try hyphen variant
k_dash = k.replace("_", "-")
if k_dash in spec_param_set:
normalized[k_dash] = v
continue
# Case-insensitive fallbacks
kl = k.lower()
if kl in spec_param_map_ci:
normalized[spec_param_map_ci[kl]] = v
continue
k_dash_l = k_dash.lower()
if k_dash_l in spec_param_map_ci:
normalized[spec_param_map_ci[k_dash_l]] = v
continue
# Unknown to the spec; pass through as-is (aiopenapi3 will validate)
normalized[k] = v
return normalized
def _etag_key(self) -> str:
"""Generate a key name used to cache ETag responses based on app_name and cache_key
Returns:
str: Key
"""
# ignore the token this will break the cache
return f"{slugify(self.api.app_name)}_etag_{self._cache_key()}" # type: ignore app_name is added by a plugin
def _cache_key(self) -> str:
"""Generate a key name used to cache responses based on method, url, args, kwargs
Returns:
str: Key
"""
# ignore the token this will break the cache
ignore_keys = [
"token",
]
_kwargs = {key: value for key, value in self._kwargs.items() if key not in ignore_keys}
data = (self.method + self.url + str(self._args) + str(_kwargs)).encode('utf-8')
str_hash = md5(data).hexdigest() # nosec B303
return f'esi_{str_hash}'
def _extract_body_param(self) -> Token | None:
"""Pop the request body from parameters to be able to check the param validity
Returns:
Any | None: the request body
"""
_body = self._kwargs.pop("body", None)
if _body and not getattr(self.operation, "requestBody", False):
raise ValueError("Request Body provided on endpoint with no request body parameter.")
return _body
def _extract_token_param(self) -> Token | None:
"""Pop token from parameters or use the Client wide token if set
Returns:
Token | None: The token to use for the request
"""
_token = self._kwargs.pop("token", None)
if _token and not getattr(self.operation, "security", False):
if getattr(settings, "DEBUG", False):
# Don't throw in debug for ESI Test Purposes.
pass
else:
raise ValueError("Token provided on public endpoint")
return self.token or _token
def _has_page_param(self) -> bool:
"""Check if this operation supports Offset Based Pagination.
Returns:
bool: True if page parameters are present, False otherwise
"""
return any(p.name == "page" for p in self.operation.parameters)
def _has_cursor_param(self) -> bool:
"""Check if this operation supports Cursor Based Pagination.
Returns:
bool: True if cursor parameters are present, False otherwise
"""
return any(p.name == "before" or p.name == "after" for p in self.operation.parameters)
def _get_cache(self, cache_key: str, etag: str | None) -> tuple[ResponseHeadersType | None, Any, Response | None]:
"""Retrieve cached response and validate expiry
Args:
cache_key (str): The cache key to retrieve
Returns:
tuple[ResponseHeadersType | None, Any, Response | None]: The cached response,
or None if not found or expired
"""
if not app_settings.ESI_CACHE_RESPONSE:
return None, None, None
try:
cached_response = cache.get(cache_key)
except Exception as e:
logger.error(f"Cache retrieve failed {e}", exc_info=True)
return None, None, None
if cached_response:
logger.debug(f"Cache Hit {self.url}")
expiry = self._get_cache_expiry(cached_response.headers)
# force check to ensure cache isn't expired
if expiry < 0:
logger.warning("Cache expired by %d seconds, forcing expiry", expiry)
return None, None, None
# check if ETag is same before building models from cache
if etag:
if cached_response.headers.get('ETag') == etag:
# refresh/store the ETag's TTL
self._send_signal(
status_code=0, # this is a cached response less a 304
headers=cached_response.headers,
latency=0
)
self._store_etag(cached_response.headers)
raise HTTPNotModified(
status_code=304,
headers=cached_response.headers
)
# build models
headers, data = self.parse_cached_request(cached_response)
return headers, data, cached_response
return None, None, None
def _store_etag(self, headers: dict) -> None:
"""
Store response ETag in cache for 7 days
"""
if "ETag" in headers:
cache.set(self._etag_key(), headers["ETag"], timeout=ETAG_EXPIRY)
def _update_etag_ttl(self) -> None:
"""
reset ETag ttl in cache.
"""
cache.expire(self._etag_key(), timeout=ETAG_EXPIRY)
def _clear_etag(self):
""" Delete the cached ETag for this operation.
"""
try:
cache.delete(self._etag_key())
except Exception as e:
logger.error(f"Failed to delete ETag {e}", exc_info=True)
def _store_cache(self, cache_key: str, response) -> None:
""" Store the response in cache for expiry TTL.
Args:
cache_key (str): The cache key to store the response under
response (Response): The response object to cache
"""
if not app_settings.ESI_CACHE_RESPONSE:
return
ttl = self._get_cache_expiry(response.headers)
if ttl > 0:
try:
cache.set(cache_key, response, ttl)
except Exception as e:
logger.error(f"Failed to cache {e}", exc_info=True)
def _clear_cache(self) -> None:
""" Delete the cached data for this operation.
"""
try:
cache.delete(self._cache_key())
except Exception as e:
logger.error(f"Failed to delete cache {e}", exc_info=True)
def _validate_token_scopes(self, token: Token) -> None:
"""Validate that the token provided has the required scopes for this ESI operation.
"""
token_scopes = set(token.scopes.all().values_list("name", flat=True))
try:
security = getattr(self.operation, "security", [])
if security:
security = getattr(self.operation, "security", [])[0]
else:
security = {}
required_scopes = set(getattr(security, "root", {}).get("OAuth2", []))
except KeyError:
required_scopes = []
missing_scopes = [x for x in required_scopes if x not in token_scopes]
if len(missing_scopes) > 0:
raise ValueError(f"Token Missing Scopes - {missing_scopes}")
def parse_cached_request(self, cached_response) -> tuple[ResponseHeadersType, ResponseDataType]:
req = self.api.createRequest(
f"{self.operation.tags[0]}.{self.operation.operationId}"
)
return req._process_request(cached_response)
def _set_bucket(self) -> None:
"""Setup the rate bucket"""
_rate_limit = getattr(self.operation, "extensions", {}).get("rate-limit", False)
if _rate_limit:
_key = _rate_limit["group"]
if self.token:
_key = f"{_key}:{self.token.character_id}"
self.bucket = ESIRateLimitBucket(
_key,
_rate_limit["max-tokens"],
interval_to_seconds(_rate_limit["window-size"])
)
def _send_signal(self, status_code: int, headers: dict = {}, latency: float = 0) -> None:
"""
Dispatch the esi request statistics signal
"""
esi_request_statistics.send(
sender=self.__class__,
operation=self.operation.operationId,
status_code=status_code,
headers=headers,
latency=latency,
bucket=self.bucket.slug if self.bucket else ""
)
def _get_cache_expiry(self, headers: dict[str, Any] = {}) -> int:
if "cache-control" in headers:
# If both Expires and Cache-Control: max-age are available,
# max-age is defined to be preferred.
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/Caching#expires_or_max-age
# This function will handle falling back to Expires if max age is missing
return _unpack_cache_control(headers)
elif "expires" in headers:
# If cache-control is missing entirely this is Legacy
return _time_to_expiry(str(headers.get('Expires')))
else:
return 0
class EsiOperation(BaseEsiOperation):
def __skip__process__headers__(
self, result, headers: dict[str, str], expected_response
):
"""Return all headers always"""
return headers
def _make_request(
self,
parameters: dict[str, Any],
etag: str | None = None,
last_modified: datetime | None = None) -> RequestBase.Response:
reset = cache.get("esi_error_limit_reset")
if reset is not None:
# Hard exception here if there is still an open Error Limit
# developers need to either decorators.wait_for_esi_error_limit_reset()
# or handle this by pushing their celery tasks back
raise ESIErrorLimitException(reset=reset)
if self.bucket:
"""Check Rate Limit"""
ESIRateLimits.check_bucket(self.bucket)
retry = http_retry_sync()
def __func():
req = self.api.createRequest(f"{self.operation.tags[0]}.{self.operation.operationId}")
# We want all headers from ESI
# don't check/parse them against the spec and return them all
# TODO Investigate if this is a bug with aiopenapi or a spec compliance issue
req._process__headers = self.__skip__process__headers__
if self.token:
self.api.authenticate(OAuth2=True) # make the lib happy
if isinstance(self.token, str):
# Fallback older Django-ESI Behaviour
# Deprecated
req.req.headers["Authorization"] = f"Bearer {self.token}"
warnings.warn(
"Passing an Access Token string directly is deprecated."
"Doing so will Skip Validation of Scopes"
"Please use a Token object instead.",
DeprecationWarning,
stacklevel=2
)
else:
self._validate_token_scopes(self.token)
req.req.headers["Authorization"] = f"Bearer {self.token.valid_access_token()}"
if etag:
req.req.headers["If-None-Match"] = etag
if last_modified:
try:
req.req.headers["If-Modified-Since"] = last_modified.strftime("%a, %d %b %Y %H:%M:%S GMT")
except Exception as e:
logger.error(f"Error formatting last_modified: {e}")
_response = req.request(data=self.body, parameters=self._unnormalize_parameters(parameters))
if self.bucket and "x-ratelimit-remaining" in _response.result.headers:
logger.debug(
"ESI Rate-Limit: "
f"'{_response.result.headers.get('x-ratelimit-group')}' - "
f"Used {_response.result.headers.get('x-ratelimit-used')} - "
f"{_response.result.headers.get('x-ratelimit-remaining')} / "
f"({_response.result.headers.get('x-ratelimit-limit')})"
)
ESIRateLimits.set_bucket(
self.bucket,
_response.result.headers.get("x-ratelimit-remaining")
)
return _response
return retry(__func)
def result(
self,
use_etag: bool = True,
return_response: bool = False,
force_refresh: bool = False,
use_cache: bool = True,
store_cache: bool = True,
last_modified: datetime | None = None,
**extra) -> tuple[Any, Response] | Any:
"""Executes the request and returns the response from ESI for the current operation.
Keyword Arguments:
use_etag -- Use the inbuilt ETag matching system (default True)
return_response -- return the headers and request information (default False)
force_refresh -- clear ETag and cache, force a re-fetch from ESI (default False)
use_cache -- check cache prior to fetching from ESI (default True)
store_cache -- store the returned data from ESI in cache (default True)
last_modified -- Optional datetime to send as If-Modified-Since
Raises:
ESIErrorLimitException: _description_
ESIBucketLimitException: _description_
HTTPNotModified: _description_
HTTPServerError: _description_
HTTPClientError: _description_
Returns:
_type_: _description_
"""
_t = default_timer()
self.token = self._extract_token_param()
if self.token:
self._set_bucket()
self.body = self._extract_body_param()
parameters = self._kwargs | extra
cache_key = self._cache_key()
etag_key = self._etag_key()
etag = None
if force_refresh:
self._clear_cache()
self._clear_etag()
last_modified = None
if use_etag:
etag = cache.get(etag_key)
headers, data, response = self._get_cache(cache_key, etag=etag) if use_cache else (None, None, None)
if response and use_cache:
expiry = self._get_cache_expiry(headers)
if expiry < 0:
logger.warning(
"cache expired by %d seconds, Forcing expiry", expiry
)
response = None
headers = None
data = None
if not response:
if etag:
logger.debug(f"Cache Miss, E-Tag Hit {self.url}")
else:
logger.debug(f"Cache Miss {self.url}")
try:
headers, data, response = self._make_request(parameters, etag, last_modified)
# Shim our exceptions into Django-ESI
except base_HTTPServerError as e:
self._send_signal(
status_code=e.status_code,
headers=e.headers,
latency=default_timer() - _t
)
raise HTTPServerError(
status_code=e.status_code,
headers=e.headers,
data=e.data
)
except base_HTTPClientError as e:
self._send_signal(
status_code=e.status_code,
headers=e.headers,
latency=default_timer() - _t
)
if e.status_code == 420:
reset = e.headers.get("X-RateLimit-Reset", None)
if reset:
reset = int(reset)
cache.set("esi_error_limit_reset", reset, timeout=reset)
raise ESIErrorLimitException(reset=reset)
raise HTTPClientError(
status_code=e.status_code,
headers=e.headers,
data=e.data
)
self._send_signal(
status_code=response.status_code,
headers=response.headers,
latency=default_timer() - _t
)
# store the ETag in cache if using it.
if use_etag:
self._store_etag(response.headers)
# Throw a 304 exception for catching.
if response.status_code == 304:
# refresh/store the ETag's TTL
self._update_etag_ttl()
raise HTTPNotModified(
status_code=304,
headers=response.headers
)
# last step store cache after 304 logic, we dont want to catch the 304 `None` responses
if store_cache:
self._store_cache(cache_key, response)
else:
# send signal for cached data too
self._send_signal(
status_code=0,
headers=response.headers,
latency=default_timer() - _t
)
return (data, response) if return_response else data
def results(
self,
use_etag: bool = True,
return_response: bool = False,
force_refresh: bool = False,
use_cache: bool = True,
store_cache: bool = True,
last_modified: datetime | None = None,
**extra) -> tuple[list[Any], Response | Any | None] | list[Any]:
all_results: list[Any] = []
last_response: Response | None = None
"""Executes the request and returns the response from ESI for the current
operation. Response will include all pages if there are more available.
Keyword Arguments:
use_etag -- Use the inbuilt ETag matching system (default True)
return_response -- return the headers and request information (default False)
force_refresh -- clear ETag and cache, force a re-fetch from ESI (default False)
use_cache -- check cache prior to fetching from ESI (default True)
store_cache -- store the returned data from ESI in cache (default True)
last_modified -- Optional datetime to send as If-Modified-Since
Raises:
ESIErrorLimitException: _description_
ESIBucketLimitException: _description_
HTTPNotModified: _description_
HTTPServerError: _description_
HTTPClientError: _description_
Returns:
_type_: _description_
"""
if self._has_page_param():
current_page = 1
total_pages = 1
count_pages_etag_hit = 0
force_refetch = False
last_headers = None
while current_page <= total_pages:
self._kwargs["page"] = current_page
try:
data, response = self.result(
# use cache where we can, but ignore ETag if we are re-fetching
use_etag=use_etag and not force_refetch,
return_response=True,
force_refresh=force_refresh,
use_cache=use_cache,
store_cache=store_cache,
last_modified=last_modified if not force_refresh else None, # Im treating this like an extra ETag here
**extra
)
last_response = response
all_results.extend(data if isinstance(data, list) else [data])
total_pages = int(response.headers.get("X-Pages", 1))
logger.debug(
f"ESI Page Fetched {self.url} - {current_page}/{total_pages}"
)
current_page += 1
except HTTPNotModified as e:
count_pages_etag_hit += 1
total_pages = int(e.headers.get("X-Pages", 1))
logger.debug(
f"ESI Page hit ETAG {self.url} - {current_page}/{total_pages}"
)
current_page += 1
last_headers = e.headers
if (
current_page > total_pages and
count_pages_etag_hit != total_pages and
count_pages_etag_hit > 0
):
# Not all pages hit ETag, so refetch all
force_refetch = True
current_page = 1
count_pages_etag_hit = 0
all_results = []
last_response = None
elif (
current_page > total_pages and
count_pages_etag_hit == total_pages
):
# All ETags hit raise 304
raise HTTPNotModified(
status_code=304,
headers=last_headers
)
elif self._has_cursor_param():
# Untested, there are no cursor based endpoints in ESI
for cursor_param in ("after", "before"):
if extra.get(cursor_param):
break
else:
cursor_param = "after"
while True:
data, response = self.result(
use_etag=use_etag,
return_response=True,
force_refresh=force_refresh,
use_cache=use_cache,
store_cache=store_cache,
last_modified=last_modified if not force_refresh else None, # Im treating this like an extra ETag here
**extra
)
last_response = response
if not data:
break
all_results.extend(data if isinstance(data, list) else [data])
cursor_token = getattr(data.cursor, cursor_param, False)
if not cursor_token:
break
extra[cursor_param] = cursor_token
else:
data, response = self.result(
use_etag=use_etag,
return_response=True,
force_refresh=force_refresh,
use_cache=use_cache,
store_cache=store_cache,
**extra
)
all_results.extend(data if isinstance(data, list) else [data])
last_response = response
return (all_results, last_response) if return_response else all_results
def results_localized(
self,
languages: list[str] | str | None = None,
**kwargs
) -> dict[str, list[Any]]:
"""Executes the request and returns the response from ESI for all default languages and pages (if any).
Args:
languages: (list[str], str, optional) language(s) to return instead of default languages
Raises:
ValueError: Invalid or Not Supported Language Code ...
Returns:
dict[str, list[Any]]: Dict of all responses with the language code as keys.
"""
if not languages:
my_languages = list(app_settings.ESI_LANGUAGES)
else:
my_languages = []
for lang in dict.fromkeys(languages):
if lang not in app_settings.ESI_LANGUAGES:
raise ValueError('Invalid or Not Supported Language Code: %s' % lang)
my_languages.append(lang)
return {
language: self.results(accept_language=language, **kwargs)
for language in my_languages
}
def required_scopes(self) -> list[str]:
"""Return a simple list of scopes required for an endpoint. #Requires loading and processing a client
Returns:
list[str]: List of Scopes Required
"""
try:
if not getattr(self.operation, "security", False):
return [] # No Scopes Required
else:
return list(getattr(getattr(self.operation, "security", [])[0], "root", {}).get("OAuth2", []))
except (IndexError, KeyError):
return []
class EsiOperationAsync(BaseEsiOperation): # pragma: no cover
async def _make_request(
self,
parameters: dict[str, Any],
etag: str | None = None,
last_modified: datetime | None = None
) -> RequestBase.Response:
reset = cache.get("esi_error_limit_reset")
if reset is not None:
# Hard exception here if there is still an open rate limit
# developers need to either decorators.wait_for_esi_error_limit_reset()
# or handle this by pushing their celery tasks back
raise ESIErrorLimitException(reset=reset)
async for attempt in http_retry_async():
with attempt:
req = self.api.createRequest(f"{self.operation.tags[0]}.{self.operation.operationId}")
if self.token:
self.api.authenticate(OAuth2=True) # make the lib happy
self._validate_token_scopes(self.token)
req.req.headers["Authorization"] = f"Bearer {self.token.valid_access_token()}"
if etag:
req.req.headers["If-None-Match"] = etag
if last_modified:
try:
req.req.headers["If-Modified-Since"] = last_modified.strftime("%a, %d %b %Y %H:%M:%S GMT")
except Exception as e:
logger.error(f"Error formatting last_modified: {e}")
return req.request(parameters=self._unnormalize_parameters(parameters))
# Should never be reached because AsyncRetrying always yields at least once
raise RuntimeError("Retry loop exited without performing a request")
async def result(
self,
etag: str | None = None,
return_response: bool = False,
use_cache: bool = True,
last_modified: datetime | None = None,
**extra
) -> tuple[Any, Response] | Any:
self.token = self._extract_token_param()
parameters = self._kwargs | extra
cache_key = self._cache_key()
etag_key = f"{cache_key}_etag"
if not etag and app_settings.ESI_CACHE_RESPONSE:
etag = cache.get(etag_key)
headers, data, response = self._get_cache(cache_key, etag)
if response and use_cache:
expiry = _time_to_expiry(str(headers.get('Expires')))
if expiry < 0:
logger.warning(
"cache expired by %d seconds, Forcing expiry", expiry
)
response = None
headers = None
data = None
if not response:
logger.debug(f"Cache Miss {self.url}")
try:
headers, data, response = await self._make_request(parameters, etag, last_modified)
if response.status_code == 420:
reset = response.headers.get("X-RateLimit-Reset", None)
cache.set("esi_error_limit_reset", reset, timeout=reset)
raise ESIErrorLimitException(reset=reset)
self._store_cache(cache_key, response)
self._store_etag(response.headers)
# Shim our exceptions into Django-ESI
except base_HTTPServerError as e:
raise HTTPServerError(
status_code=e.status_code,
headers=e.headers,
data=e.data
)
except base_HTTPClientError as e:
raise HTTPClientError(
status_code=e.status_code,
headers=e.headers,
data=e.data
)
# Throw a 304 exception for catching.
if response.status_code == 304:
# refresh/store the ETag's TTL
self._store_etag(response.headers)
raise HTTPNotModified(
status_code=304,
headers=response.headers
)
return (data, response) if return_response else data
async def results(
self,
etag: str | None = None,
return_response: bool = False,
use_cache: bool = True,
**extra
) -> tuple[list[Any], Response | Any | None] | list[Any]:
all_results = []
last_response = None
if self._has_page_param():
current_page = 1
total_pages = 1
while current_page <= total_pages:
self._kwargs["page"] = current_page
data, response = await self.result(etag=etag, return_response=True, use_cache=use_cache, **extra)
last_response = response
all_results.extend(data if isinstance(data, list) else [data])
total_pages = int(response.headers.get("X-Pages", 1))
logger.debug(
f"ESI Page Fetched {self.url} - {current_page}/{total_pages}"
)
current_page += 1
# elif self._has_cursor_param():
# TODO
else:
data, response = await self.result(etag=etag, return_response=True, use_cache=use_cache, **extra)
all_results.extend(data if isinstance(data, list) else [data])
last_response = response
return (all_results, last_response) if return_response else all_results
def results_localized(
self,
languages: list[str] | str | None = None,
**extra) -> dict[str, list[Any]]:
"""Executes the request and returns the response from ESI for all default languages and pages (if any).
Args:
languages: (list[str], str, optional) language(s) to return instead of default languages
Raises:
ValueError: Invalid or Not Supported Language Code ...
Returns:
dict[str, list[Any]]: Dict of all responses with the language code as keys.
"""
if not languages:
my_languages = list(app_settings.ESI_LANGUAGES)
else:
my_languages = []
for lang in dict.fromkeys(languages):
if lang not in app_settings.ESI_LANGUAGES:
raise ValueError('Invalid or Not Supported Language Code: %s' % lang)
my_languages.append(lang)
return {
language: self.results(accept_language=language, **extra)
for language in my_languages
}
def required_scopes(self) -> list[str]:
"""Return a simple list of scopes required for an endpoint. #Requires loading and processing a client
Returns:
list[str]: List of Scopes Required
"""
try:
if not getattr(self.operation, "security", False):
return [] # No Scopes Required
else:
return list(getattr(getattr(self.operation, "security", [])[0], "root", {}).get("OAuth2", []))
except (IndexError, KeyError):
return []
class ESITag:
"""
API Tag Wrapper, providing access to Operations within a tag
Assets, Characters, etc.
"""
def __init__(self, operation, api) -> None:
self._oi = operation._oi
self._operations = operation._operations
self.api = api
def __getattr__(self, name: str) -> EsiOperation:
if name not in self._operations:
raise AttributeError(
f"Operation '{name}' not found in tag '{self._oi}'. "
f"Available operations: {', '.join(sorted(self._operations.keys()))}"
)
return EsiOperation(self._operations[name], self.api)
class ESITagAsync(): # pragma: no cover
"""
Async API Tag Wrapper, providing access to Operations within a tag
Assets, Characters, etc.
"""
def __init__(self, operation, api) -> None:
self._oi = operation._oi
self._operations = operation._operations
self.api = api
def __getattr__(self, name: str) -> EsiOperationAsync:
if name not in self._operations:
raise AttributeError(
f"Operation '{name}' not found in tag '{self._oi}'. "
f"Available operations: {', '.join(sorted(self._operations.keys()))}"
)
return EsiOperationAsync(self._operations[name], self.api)
class ESIClient(ESIClientStub):
"""
Base ESI Client, provides access to Tags Assets, Characters, etc.
or Raw aiopenapi3 via sad smiley ._.
"""
def __init__(self, api: OpenAPI) -> None:
self.api = api
self._tags = set(api._operationindex._tags.keys())
def __getattr__(self, tag: str) -> ESITag | OperationIndex:
# underscore returns the raw aiopenapi3 client
if tag == "_":
return self.api._operationindex
# convert pythonic Planetary_Interaction to Planetary Interaction
if "_" in tag:
tag = tag.replace("_", " ")
if tag in set(self.api._operationindex._tags.keys()):
return ESITag(self.api._operationindex._tags[tag], self.api)
raise AttributeError(
f"Tag '{tag}' not found. "
f"Available tags: {', '.join(sorted(self._tags))}"
)
def purge_all_etags(self) -> Any:
""" Delete all stored ETags from the cache for this application
TODO: consider making this more config agnostic
"""
try:
# new lib
from django_redis import get_redis_connection
_client = get_redis_connection("default")
except (NotImplementedError, ModuleNotFoundError):
# old lib
from django.core.cache import caches
default_cache = caches['default']
_client = default_cache.get_master_client() # type: ignore
keys = _client.keys(f":?:{slugify(self.api.app_name)}_etag_*") # type: ignore app_name is added by a plugin
if keys:
deleted = _client.delete(*keys)
logger.info(f"Deleted {deleted} ETag keys")
return deleted
class ESIClientAsync(ESIClientStub): # pragma: no cover
"""
Async Base ESI Client, provides access to Tags Assets, Characters, etc.
or Raw aiopenapi3 via sad smiley ._.
"""
def __init__(self, api: OpenAPI) -> None:
self.api = api
self._tags = set(api._operationindex._tags.keys())
def __getattr__(self, tag: str) -> ESITagAsync | OperationIndex:
# underscore returns the raw aiopenapi3 client
if tag == "_":
return self.api._operationindex
# convert pythonic Planetary_Interaction to Planetary Interaction
if "_" in tag:
tag = tag.replace("_", " ")
if tag in set(self.api._operationindex._tags.keys()):
return ESITagAsync(self.api._operationindex._tags[tag], self.api)
raise AttributeError(
f"Tag '{tag}' not found. "
f"Available tags: {', '.join(sorted(self._tags))}"
)
[docs]
class ESIClientProvider:
"""Class for providing a single ESI client instance for a whole app
* Note that one of either `tags` or `operations` must be provided to reduce memory footprint of the client
* When `DEBUG=False`, not supplying either will raise an AttributeError.
Args:
compatibility_date (str | date): The compatibility date for the ESI client.
ua_appname (str): Name of the App for generating a User-Agent,
ua_version (str): Version of the App for generating a User-Agent,
ua_url (str, Optional): URL To the Source Code or Documentation for generating a User-Agent,
spec_file (str, Optional): Absolute path to a OpenApi 3.1 spec file to load.
tenant (str, Optional): The ESI tenant to use (default: "tranquility").
operations (list[str], Optional*): List of operations to filter the spec down.
tags (list[str], Optional*): List of tags to filter the spec down.
Functions:
client(): ESIClient
client_async(): ESIClientAsync
"""
_client: ESIClient | None = None
_client_async: ESIClientAsync | None = None
def __init__(
self,
compatibility_date: str | date,
ua_appname: str,
ua_version: str,
ua_url: str | None = None,
spec_file: None | str = None,
tenant: str = "tranquility",
operations: list[str] = [],
tags: list[str] = [],
additional_spec_headers: dict = {},
**kwargs
) -> None:
if type(compatibility_date) is date:
self._compatibility_date: str = self._date_to_string(compatibility_date)
else:
self._compatibility_date: str = str(compatibility_date)
self._ua_appname = ua_appname
self._ua_version = ua_version
self._ua_url = ua_url
self._spec_file = spec_file
self._tenant = tenant
self._kwargs = kwargs
self._operations = operations
self._tags = tags
self._spec_headers = additional_spec_headers
@property
def client(self) -> ESIClient:
if self._client is None:
api = esi_client_factory_sync(
compatibility_date=self._compatibility_date,
ua_appname=self._ua_appname,
ua_version=self._ua_version,
ua_url=self._ua_url,
spec_file=self._spec_file,
tenant=self._tenant,
operations=self._operations,
tags=self._tags,
additional_spec_headers=self._spec_headers,
**self._kwargs)
self._client = ESIClient(api)
return self._client
@property
async def client_async(self) -> ESIClientAsync: # pragma: no cover
if self._client_async is None:
api = await esi_client_factory_async(
compatibility_date=self._compatibility_date,
ua_appname=self._ua_appname,
ua_version=self._ua_version,
ua_url=self._ua_url,
spec_file=self._spec_file,
tenant=self._tenant,
operations=self._operations,
tags=self._tags,
**self._kwargs)
self._client_async = ESIClientAsync(api)
return self._client_async
@classmethod
def _date_to_string(cls, compatibility_date: date) -> str:
"""Turns a date object in a compatibility_date string"""
return f"{compatibility_date.year}-{compatibility_date.month:02}-{compatibility_date.day:02}"
def __str__(self) -> str:
return f"ESIClientProvider - {self._ua_appname} ({self._ua_version})"