Introduction
API (Application Programming Interface) consumption is the new normal in contemporary times as many software products have shifted focus on decoupling Backend and Frontend codebases. Backend Engineers are tasked with writing consumable APIs that their Frontend counterparts consume. In some cases, even Backend Engineers utilize some other API services to accomplish their tasks.
Some services provide an enormously large dataset so making them accessible at a single API call might not be great. Pagination then comes to the rescue. Many APIs are now paginated to make available a fraction of the data. To access other fractions, you need some extra tasks.
This article demonstrates how to set up Celery background tasks to consume paginated APIs periodically. We'll explore iterative and recursive approaches for APIs paginated using page
parameters and those using next
URLs. The fetched data will be stored in a Django model, overwriting previous data. Note that persisting historical data is outside the scope of this article but will be addressed in a future post on building a data warehouse.
Prerequisite
Basic familiarity with Django is assumed. Refer to the Django tutorial for an introduction.
Source code
Exporting Django model data as excel file (.xlsx) using openpyxl library and Google Spreadsheet API
Implementation
Step 1: Setup Django project and app with celery
Create a Django project named django_excel
within a virtual environment. Ensure that django
and celery
are installed in your environment.
virtualenv➜ django_excel git:(main) django-admin startproject django_excel .
Create an app named core
:
virtualenv➜ django_excel git:(main) python manage.py startapp core
Register your application in settings.py
.
...
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'core.apps.CryptoappConfig', #Our new app
]
...
It is time to set up our application to utilize Celery. To do this, create a file aptly named celery.py
in your project's directory and paste the following snippet:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_excel.settings')
app = Celery('django_excel')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
That was directly lifted from Celery Django documentation. Ensure you modify lines 6 and 8 to reflect your project's name. The namespace
in line 14 enables you to prefix all celery-related configurations in your settings.py
file with CELERY
such as CELERY_BROKER_URL
.
Because you are literarily providing constants, the celery-related configurations in your settings.py
file are capitalized. For instance, one of the configurations is beat_schedule
which in Django, becomes CELERY_BEAT_SCHEDULE
.
Next, open open your project's __init__.py
and append the following:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
To conclude celery-related configurations, let's set the following in settings.py
:
...
CELERY_BROKER_URL: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT: list[str] = ['application/json']
CELERY_TASK_SERIALIZER: str = 'json'
CELERY_RESULT_SERIALIZER: str = 'json'
We are using redis as our broker. You can opt for RabbitMQ which is supported out-of-box by celery.
In the settings above, I am linking CELERY_BROKER_URL
to an environment variable named REDIS_URL
. It normally should look like redis://127.0.0.1:6379
on a Linux system. That means I could have set my CELERY_BROKER_URL
and CELERY_RESULT_BACKEND
as:
...
from celery.schedules import crontab
from decouple import Csv, config
...
CELERY_BROKER_URL: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT: list[str] = ['application/json']
CELERY_TASK_SERIALIZER: str = 'json'
CELERY_RESULT_SERIALIZER: str = 'json'
Note that CELERY_RESULT_BACKEND
is optional as well as CELERY_ACCEPT_CONTENT
, CELERY_TASK_SERIALIZER
, and CELERY_RESULT_SERIALIZER
. However, not setting the last three might result in some runtime errors mostly when dealing with databases in asynchronous email broadcasting with celery.
The stage is now set, let's set up our database. We will be consuming CoinGecko's API and will be saving some data.
Step 2: Define `FullCoin` model
Our model will look like this:
from django.db import models
class FullCoin(models.Model):
coin_id = models.CharField(max_length=100, primary_key=True)
symbol = models.CharField(max_length=10, null=True, blank=True)
name = models.CharField(max_length=100, null=True, blank=True)
image = models.URLField(null=True, blank=True)
current_price = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
market_cap = models.BigIntegerField(null=True, blank=True)
market_cap_rank = models.IntegerField(null=True, blank=True)
fully_diluted_valuation = models.BigIntegerField(null=True, blank=True)
total_volume = models.BigIntegerField(null=True, blank=True)
high_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
low_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
price_change_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
price_change_percentage_24h = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
market_cap_change_24h = models.BigIntegerField(null=True, blank=True)
market_cap_change_percentage_24h = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
circulating_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
total_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True)
max_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True)
ath = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
ath_change_percentage = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
ath_date = models.DateTimeField(null=True, blank=True)
atl = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
atl_change_percentage = models.DecimalField(max_digits=20, decimal_places=5, null=True, blank=True)
atl_date = models.DateTimeField(null=True, blank=True)
last_updated = models.DateTimeField(null=True, blank=True)
def __str__(self):
return f"{self.name} ({self.symbol.upper()})"
These are all the fields taken directly from CoinGecko's public API for coin markets:
[
{
"id": "bitcoin",
"symbol": "btc",
"name": "Bitcoin",
"image": "https://assets.coingecko.com/coins/images/1/large/bitcoin.png?1696501400",
"current_price": 70187,
"market_cap": 1381651251183,
"market_cap_rank": 1,
"fully_diluted_valuation": 1474623675796,
"total_volume": 20154184933,
"high_24h": 70215,
"low_24h": 68060,
"price_change_24h": 2126.88,
"price_change_percentage_24h": 3.12502,
"market_cap_change_24h": 44287678051,
"market_cap_change_percentage_24h": 3.31157,
"circulating_supply": 19675987,
"total_supply": 21000000,
"max_supply": 21000000,
"ath": 73738,
"ath_change_percentage": -4.77063,
"ath_date": "2024-03-14T07:10:36.635Z",
"atl": 67.81,
"atl_change_percentage": 103455.83335,
"atl_date": "2013-07-06T00:00:00.000Z",
"roi": null,
"last_updated": "2024-04-07T16:49:31.736Z"
}
]
If you use Coingecko's API, when you use my code, CGSIRNEIJ
, I get some commissions. That can be a good way to support me.
null=True
makes a column nullable in SQL:
CREATE TABLE IF NOT EXISTS full_coin(
symbol VARCHAR(10) NULL,
...
)
null=False
or leaving it unset makes the column non-nullable:
CREATE TABLE IF NOT EXISTS full_coin(
symbol VARCHAR(10) NOT NULL,
...
)
blank=True
allows the field to be optional in forms and the admin page.
Talking about the admin site, let's register the model:
from django.contrib import admin
from core.models import FullCoin
@admin.register(FullCoin)
class FullCoinAdmin(admin.ModelAdmin):
list_display = (
'coin_id',
'symbol',
'name',
'current_price',
'market_cap',
'market_cap_rank',
'fully_diluted_valuation',
'total_volume',
'high_24h',
'low_24h',
'price_change_24h',
'price_change_percentage_24h',
'market_cap_change_24h',
'market_cap_change_percentage_24h',
'circulating_supply',
'total_supply',
'max_supply',
'ath',
'ath_change_percentage',
'ath_date',
'atl',
'atl_change_percentage',
'atl_date',
'last_updated',
)
With this, you can migrate your database:
virtualenv➜ django_excel git:(main) python manage.py makemigrations # create migration file(s)
virtualenv➜ django_excel git:(main) python manage.py migrate # create the tables in the db
Optionally, you can create a superuser:
virtualenv➜ django_excel git:(main) python manage.py createsuperuser
Follow the prompts.
Step 3: Create and Register Periodic Tasks
Here is the juicy part:
...
import logging
import time
import requests
from celery import shared_task
from django.conf import settings
from core.models import FullCoin
logger = logging.getLogger(__name__)
def build_api_url(page: int) -> str:
"""Build the API URL."""
market_currency_order = 'markets?vs_currency=usd&order=market_cap_desc&'
per_page = f'per_page=50&page={page}&sparkline=false'
return f'{settings.BASE_API_URL}/coins/{market_currency_order}{per_page}'
def fetch_coins_iteratively() -> Generator[dict, None, None]:
"""Fetch coins data from API using generator."""
page = 1
while True:
try:
url = build_api_url(page)
response = requests.get(url)
coin_data = response.json()
# Check for rate limit response
if isinstance(coin_data, dict) and coin_data.get('status', {}).get('error_code') == 429:
logger.warning("Rate limit exceeded. Waiting 60 seconds...")
time.sleep(60)
continue
# Check for empty response (end of pagination)
if not coin_data:
break
yield from coin_data
logger.info(f"Fetched page {page} with {len(coin_data)} coins")
page += 1
time.sleep(1) # Be nice to the API
except requests.exceptions.RequestException as e:
logger.error(f"Request failed on page {page}: {e}")
raise
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
max_retries=5,
)
def get_full_coin_data_iteratively_for_page(self) -> None:
"""Get full coin data iteratively for each page."""
try:
# Use list comprehension to collect coins in batches
batch_size = 100
coins_batch = []
for coin in fetch_coins_iteratively():
coins_batch.append(coin)
if len(coins_batch) >= batch_size:
logger.info(f"Processing batch of {len(coins_batch)} coins")
store_data(coins_batch)
coins_batch = []
# Process remaining coins
if coins_batch:
logger.info(f"Processing final batch of {len(coins_batch)} coins")
store_data(coins_batch)
except Exception as e:
logger.error(f"Failed to process coins: {e}")
raise self.retry(exc=e)
The build_api_url
helps continuously build CoinGecko API url based on the supplied page number. The BASE_API_URL
is:
...
BASE_API_URL: str = 'https://api.coingecko.com/api/v3'
fetch_coins_iteratively
is the core of the program. It starts with the first page and does an "infinite" loop which breaks only when there's no data returned by the API using the iterative strategy.
Its recursive alternative is:
def fetch_coins_recursively(page: int = 1) -> Generator[dict, None, None]:
"""Fetch coins data from API recursively using generator."""
try:
url = build_api_url(page)
response = requests.get(url)
coin_data = response.json()
# Check for rate limit response
if isinstance(coin_data, dict) and coin_data.get('status', {}).get('error_code') == 429:
logger.warning("Rate limit exceeded. Waiting 60 seconds...")
time.sleep(60)
yield from fetch_coins_recursively(page)
return
# Base case: empty response (end of pagination)
if not coin_data:
return
# Process current page
yield from coin_data
logger.info(f"Fetched page {page} with {len(coin_data)} coins")
# Be nice to the API
time.sleep(1)
# Recursive case: fetch next page
yield from fetch_coins_recursively(page + 1)
except requests.exceptions.RequestException as e:
logger.error(f"Request failed on page {page}: {e}")
raise
Then there is the get_full_coin_data_iteratively_for_page
which is decorated by shared_task
(for task autodiscovery). We supplied some parameters:
bind=True
to access task instance viaself
autoretry_for=(Exception,)
to auto-retry on exceptionsretry_backoff=True
for exponential backoffmax_retries=5
to limit retries to 5
For this task to be periodic, we must add it to the CELERY_BEAT_SCHEDULE
in settings.py
:
...
CELERY_BEAT_SCHEDULE: dict[str, dict[str, Any]] = {
...
'get_full_coin_data_iteratively_for_page': {
'task': 'core.tasks.get_full_coin_data_iteratively_for_page',
'schedule': crontab(minute='*/3'),
},
}
It schedules this task to run every 3 minutes ('*/3'
) using crontab
.
These implementations were with performance in mind. However, there is still room for improvement.
Step 4: Bonus
There are APIs whose paginations are not page
-based but use the next
(default DRF pagination strategy). For these systems, the last bits of data have empty next
. That's the breaking point:
def get_api_data(url: str) -> None:
"""Make recursive requests."""
headers = {'Authorization': f'Token {settings.API_TOKEN}'}
response = requests.get(url, headers=headers)
data: dict[str, Any] = response.json()
if 'next' in data and data.get('next') is not None:
get_api_data(data.get('next'))
That's it! I hope you enjoyed it.
Outro
Enjoyed this article? I'm a Software Engineer, Technical Writer, and Technical Support Engineer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.
If you found this article valuable, consider sharing it with your network to help spread the knowledge!