Developer Guide

The developer guide describes how to develop apps with django-esi.

Usage in views

Single token

When views require a token, wrap with the token_required decorator and accept a token arg:

from esi.decorators import token_required

@token_required(scopes="esi-characters.read_medals.v1")
def my_view(request, token):
    # my code

This will prompt the user to either select a token from their current ones, or if none exist create a new one via SSO.

To specify scopes, add either a list of names or a space-delimited string:

@token_required(scopes=['esi-location.read_ship_type.v1', 'esi-location.read_location.v1'])
@token_required(scopes='esi-location.read_ship_type.v1 esi-location.read_location.v1')

New token

To require a new token, such as for logging in, add the new argument:

@token_required(new=True)

Multiple tokens

To request all of a user’s tokens which have the required scopes, wrap instead with the tokens_required decorator and accept a tokens arg:

@tokens_required(scopes='esi-location.read_ship_type.v1')
def my_view(request, tokens):
    # my code

This skips prompting for token selection and instead passes that responsibility to the view. Tokens are provided as a queryset.

Single use token

It is also possible to request a token for single use. Single use tokens do not require a user to be logged in and are only available to the current view.

from esi.decorators import single_use_token

@single_use_token(scopes=['publicData'])
my_view(request, token):
    # my code

See also

See API section decorators for more details on all provided decorators.

Task rate limiting

The rate_limited_task decorator enforces a maximum execution rate on Celery tasks, using Redis as the coordination backend. When a task runs faster than the configured limit, it is automatically retried after the rate window expires rather than being dropped or erroring.

Note

This decorator requires Redis as your cache backend. It will not function correctly with LocMemCache or other non-shared cache backends.

Basic usage

Apply rate_limited_task below @app.task so it wraps the task function body. The task must be bound (bind=True) because the decorator needs access to self to call retry().

from esi.decorators import rate_limited_task

@app.task(bind=True)
@rate_limited_task(rate="10/m")
def fetch_character_data(self, character_id):
    # called at most 10 times per minute, globally across all workers
    ...

Rate string format

The rate argument is a string of the form "N/window", where:

  • N is the maximum number of executions allowed within the window

  • window is a duration: s (second), m (minute), h (hour), or d (day) — optionally prefixed with a number

Rate string

Meaning

"10/s"

10 per second

"100/m"

100 per minute

"1/15m"

1 per 15 minutes

"500/h"

500 per hour

The effective per-call delay is window_seconds / N. Tasks within the same bucket are spaced at least this far apart.

Scoping the rate limit with keys

By default the bucket is shared across all calls to that task across all worker threads, regardless of arguments. This would mostly be used for public ESI endpoints that do not benefit from the per character rate limiting. Pass keys to scope the bucket to specific kwargs — each unique combination of those kwarg values gets its own independent bucket.

@app.task(bind=True)
@rate_limited_task(rate="1/10s", keys=["corp_id"])
def fetch_corp_data(self, corp_id, character_id):
    # rate limited per corp_id — different corps do not share the same bucket
    ...

In this example, calls for corp_id=1 and corp_id=2 each have their own 1/10s limit and do not interfere with each other.

Retry behaviour

When the rate limit is exceeded, the decorator calls task.retry(countdown=<seconds_remaining>) and decrements task.request.retries beforehand so that rate-limit retries do not count against the task’s max_retries. This means a task that hits the rate limit multiple times will not exhaust its retry budget prematurely.

Cleaning the database

Two tasks are available:

  • cleanup_callbackredirect removes all CallbackRedirect models older than a specified age (in seconds). Default is 300.

  • cleanup_token checks all Token models, and if expired, attempts to refresh. If expired and cannot refresh, or fails to refresh, the model is deleted. It is preferred to use the next Subset task more frequently to spread out the load on CCP.

  • cleanup_token_subset checks a fractional subset of Token models, and if expired, attempts to refresh. If expired and cannot refresh, or fails to refresh, the model is deleted.

To schedule these automatically with celerybeat, add them to your settings.py CELERYBEAT_SCHEDULE dict like so:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    ...
    'esi_cleanup_callbackredirect': {
        'task': 'esi.tasks.cleanup_callbackredirect',
        'schedule': crontab(hour='*/4'),
    },
    'esi_cleanup_token_subset': {  # 1/48th * 1hr = 48Hr/2Day Refresh Cycles.
        'task': 'esi.tasks.cleanup_token_subset',
        'schedule': crontab(minute="0", hour="*/1"),
    },
}

Recommended intervals are four hours for callback redirect cleanup and hourly for a fractional token cleanup (token cleanup can get quite slow with a large database, so adjust as needed). If your app does not require background token validation, it may be advantageous to not schedule the token cleanup task, instead relying on the validation check when using @token_required decorators or adding .require_valid() to the end of a query