In modern web development, the ability to handle time-consuming tasks asynchronously is crucial for enhancing the performance and responsiveness of web applications.
Django, a powerful web framework, provides seamless integration with Celery, a popular distributed task queue, to manage asynchronous and scheduled tasks efficiently.
In this guide, we will walk through the process of using Celery in Django to perform asynchronous and scheduled tasks, complete with code examples for a smooth and straightforward implementation.
Prerequisites
Before we dive into the implementation, ensure that you have the following components set up:
1. A Django project with your desired app(s) already created.
2. A message broker (e.g., RabbitMQ, Redis) to handle task queuing.
Celery supports various message brokers for task queuing, each with its own set of dependencies you can read about them here Celery Docs - Backends and Brokers.
Celery Broker
A Celery broker is a message queue that Celery uses to communicate with workers. Workers are processes that run in the background and execute tasks. When you send a task to a Celery broker, the broker will send the task to a worker that is available to execute it.
In other words, a Celery broker acts like a middleman or a bridge that connects your Django app to Celery workers. It sends tasks to workers and then returns the results back to your Django app.
Installing Celery
Install Celery using pip:
pip install celery[redis]
Install Celery using poetry:Note that, Thepoetry add celery[redis]
[redis]
part is an extra specifier that installs the necessary Redis broker dependencies along with Celery. If you prefer using other popular brokers such as RabbitMQ or Amazon SQS, you can modify the command accordingly.
For example, to use RabbitMQ:
pip install celery[rabbitmq]
# or with poetry
poetry add celery[rabbitmq]
For Amazon SQS:
pip install celery[sqs]
# or with poetry
poetry add celery[sqs]
Installing Brokers
Installing the brokers (Redis and RabbitMQ) is outside the scope of this article. Please make sure you have them installed properly before proceeding.
To check if Redis and RabbitMQ are installed correctly on your local Linux machine, you can use the following commands:
Redis:
redis-server --version
If Redis is installed, this command will display the Redis version number. If Redis is not installed, you will get an error message indicating that the command was not found.
RabbitMQ:
rabbitmqctl status
If RabbitMQ is installed, this command will display information about the RabbitMQ server, including the version number, node name, and other details. If RabbitMQ is not installed, you will get an error message indicating that the command was not found.
Ensure that both Redis and RabbitMQ are properly installed and running on your local system before configuring Celery to use them as brokers. If they are not installed, you can follow the installation instructions for your Operating system from the official sites.
Setting Up Celery in Django
In the same directory as your Django project's settings.py
, create a new file named celery.py
.
In the celery.py
file, add the following Celery-related configurations:
celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')
# Create a Celery instance and set the broker and result backend.
app = Celery('your_project_name')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Replace 'your_project_name' with the name of your Django project.
Next, In your settings file add the following configuration.
For Redis as the Broker and Result Backend:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Replace with your Redis URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379' # Replace with your Redis URL
Replace 'localhost' with the hostname or IP address of your Redis server if it's running on a different machine. Also, ensure that the port number (6379 in this case) matches the actual port used by your Redis server.
If you are using other popular brokers like RabbitMQ or Amazon SQS, you need to modify the CELERY_BROKER_URL
accordingly.
For RabbitMQ as the Broker:
# settings.py
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' # Replace with your RabbitMQ URL
Replace 'guest' and 'guest' with your RabbitMQ username and password if you have configured them differently. Also, ensure that the hostname (localhost) and port number (5672) match your RabbitMQ server configuration.
Creating background Task in Django
In any Django app, create a tasks.py
file (if not already created) and define your Celery async task:
tasks.py
# your_app_name/tasks.py
from celery import shared_task
@shared_task
def your_async_task(arg1, arg2):
# Your asynchronous task logic here
result = arg1 + arg2
return result
The @shared_task
decorator tells Celery that the function should be executed asynchronously.
A shared_task in Celery is a task that can be executed by multiple workers at the same time. When you call a shared_task
, Celery will automatically distribute the task to multiple workers. This will ensure that the task is executed as quickly as possible.
Start the Celery Worker
Open a new terminal or command prompt and start the Celery worker using the celery command:
celery -A your_project_name.celery worker --loglevel=info
Replace 'your_project_name' with the name of your Django project.
If your setup was correct this should not return any error.
Using Celery Async tasks in Views
You can now use your Celery async task from any part of your Django app, such as views or other functions:
# your_app_name/views.py
from django.http import JsonResponse
from .tasks import your_async_task
def your_view(request):
# Call the asynchronous task using delay() method
task_result = your_async_task.delay(10, 20)
return JsonResponse({'task_id': task_result.id})
The delay()
method is provided by Celery and allows you to call the async task just like a regular Python function, but it will be executed asynchronously by a Celery worker.
Unit Testing Async Tasks
There are many strategies to unit test a celery task. The one that I use the most is running the task synchronously as a normal function call during testing.
Celery provides a built-in method called apply()
that allows you to call the task synchronously and get the result immediately.
Here's how you can do it:
# your_app_name/tests.py
from django.test import TestCase
from .tasks import your_async_task
class YourAsyncTaskTestCase(TestCase):
def test_your_async_task(self):
# Call the Celery shared task synchronously using apply()
result = your_async_task.apply(args=(10, 20)).get()
# Assertions to check the task result
self.assertEqual(result, 30)
By using apply()
with .get()
, the Celery shared task will be executed synchronously during testing, and you can directly check its result.
This allows you to treat the shared task as a normal function call for testing purposes, without the need for additional mocking.
CELERY_TASK_ALWAYS_EAGER
This setting determines whether Celery will run tasks synchronously (eagerly) or asynchronously. When set to True, Celery will execute tasks immediately as normal function calls rather than queuing them in the broker and processing them asynchronously.
This setting is mainly used for testing and development to simplify debugging and avoid the need for a separate message broker during local testing.
For example, in your Django settings:
# settings.py
CELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously for testing and development
Keep in mind that you should set CELERY_TASK_ALWAYS_EAGER
to False in production or when you want to utilize Celery's asynchronous task processing capabilities.
Periodic Tasks With Celery
To schedule Celery periodic tasks, you need to install and use Celery Beat, which is an additional component of Celery designed specifically for handling periodic tasks.
Celery Beat is responsible for managing the periodic task schedule and triggering task execution at the specified intervals. It uses a persistent database (e.g., Django's database or Redis) to store the schedule information.
Creating a periodic task
Define the Celery Task In any Django app, create a tasks.py
file (if not already created) and define your Celery task using the @app.task
decorator:
from your_project_name.celery import app
@app.task
def your_periodic_task():
# Your periodic task logic here
# This task will run at the specified interval
print("This is a periodic task.")
In your Django settings (settings.py), define the schedule for your_periodic_task using the CELERY_BEAT_SCHEDULE
setting:
# settings.py from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'your_periodic_task_name': { 'task': 'your_app_name.tasks.your_periodic_task', 'schedule':
crontab(minute='*/1'), # Schedule the task to run every minute}, }
Start Celery Beat
Start the Celery Beat scheduler to activate the periodic task:
celery -A your_project_name.celery beat --loglevel=info
Start Celery Worker
To execute the periodic task, start the Celery worker:
celery -A your_project_name worker --loglevel=info
Replace 'your_project_name' with the name of your Django project.
That's it!
Now your_periodic_task will run every minute as scheduled by Celery Beat. The Celery worker will execute the task accordingly.
Remember to keep both the Celery worker and Celery Beat scheduler running simultaneously for the periodic task to work correctly.
Using django-celery-beat
Using django-celery-beat
is optional, you may choose to use it if your project requires periodic task scheduling and management capabilities from the Admin panel.
This extension enables the user to store periodic tasks in a Django database and manage the tasks using the Django Admin interface.
Use the following steps to install django-celery-beat
in your project project.
Using pip:
pip install django-celery-beat
Using poetry:
poetry add django-celery-beat
Next, Add the django_celery_beat module to INSTALLED_APPS
in your Django project’ settings.py:
INSTALLED_APPS = (
...,
'django_celery_beat',
)
Apply Django database migrations so that the necessary tables are created:
python manage.py migrate
Start the celery beat service using the django_celery_beat.schedulers:DatabaseScheduler
scheduler:
celery -A your_project_name beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
Unit Testing Periodic Tasks
To unit test the periodic task, you can apply the same approach we used for testing asynchronous tasks by running them synchronously.