Support Our Site

To ensure we can continue delivering content and maintaining a free platform for all users, we kindly request that you disable your adblocker. Your contribution greatly supports our site's growth and development.

How to Use Celery with Django for Asynchronous and Periodic Tasks : A Comprehensive Guide

7 min read

In modern web development, the ability to handle time-consuming tasks asynchronously is crucial for enhancing the performance and responsiveness of web applications.

Django, a powerful web framework, provides seamless integration with Celery, a popular distributed task queue, to manage asynchronous and scheduled tasks efficiently.

In this guide, we will walk through the process of using Celery in Django to perform asynchronous and scheduled tasks, complete with code examples for a smooth and straightforward implementation.

Prerequisites

Before we dive into the implementation, ensure that you have the following components set up:

1. A Django project with your desired app(s) already created.

2. A message broker (e.g., RabbitMQ, Redis) to handle task queuing.

Celery supports various message brokers for task queuing, each with its own set of dependencies you can read about them here Celery Docs - Backends and Brokers.

Celery Broker 

A Celery broker is a message queue that Celery uses to communicate with workers. Workers are processes that run in the background and execute tasks. When you send a task to a Celery broker, the broker will send the task to a worker that is available to execute it.

In other words, a Celery broker acts like a middleman or a bridge that connects your Django app to Celery workers. It sends tasks to workers and then returns the results back to your Django app. 

Installing Celery

Install Celery using pip:

pip install celery[redis]
Install Celery using poetry:

poetry add celery[redis]
Note that, The [redis] part is an extra specifier that installs the necessary Redis broker dependencies along with Celery. 

If you prefer using other popular brokers such as RabbitMQ or Amazon SQS, you can modify the command accordingly.

For example, to use RabbitMQ:

pip install celery[rabbitmq]
# or with poetry
poetry add celery[rabbitmq]

For Amazon SQS:

pip install celery[sqs]
# or with poetry
poetry add celery[sqs]

Installing Brokers

Installing the brokers (Redis and RabbitMQ) is outside the scope of this article. Please make sure you have them installed properly before proceeding.

To check if Redis and RabbitMQ are installed correctly on your local Linux machine, you can use the following commands:

Redis:

redis-server --version

If Redis is installed, this command will display the Redis version number. If Redis is not installed, you will get an error message indicating that the command was not found.

RabbitMQ:

rabbitmqctl status

If RabbitMQ is installed, this command will display information about the RabbitMQ server, including the version number, node name, and other details. If RabbitMQ is not installed, you will get an error message indicating that the command was not found.

Ensure that both Redis and RabbitMQ are properly installed and running on your local system before configuring Celery to use them as brokers. If they are not installed, you can follow the installation instructions for your Operating system from the official sites.

Setting Up Celery in Django

In the same directory as your Django project's settings.py, create a new file named celery.py.

In the celery.py file, add the following Celery-related configurations:

celery.py

import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

# Create a Celery instance and set the broker and result backend.
app = Celery('your_project_name')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Replace 'your_project_name' with the name of your Django project.

Next, In your settings file add the following configuration. 

For Redis as the Broker and Result Backend:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Replace with your Redis URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379'  # Replace with your Redis URL

Replace 'localhost' with the hostname or IP address of your Redis server if it's running on a different machine. Also, ensure that the port number (6379 in this case) matches the actual port used by your Redis server.

If you are using other popular brokers like RabbitMQ or Amazon SQS, you need to modify the CELERY_BROKER_URL accordingly.

For RabbitMQ as the Broker:

# settings.py
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'  # Replace with your RabbitMQ URL

Replace 'guest' and 'guest' with your RabbitMQ username and password if you have configured them differently. Also, ensure that the hostname (localhost) and port number (5672) match your RabbitMQ server configuration.

Creating background Task in Django 

In any Django app, create a tasks.py file (if not already created) and define your Celery async task:

tasks.py

# your_app_name/tasks.py
from celery import shared_task

@shared_task
def your_async_task(arg1, arg2):
    # Your asynchronous task logic here
    result = arg1 + arg2
    return result

The @shared_task decorator tells Celery that the function should be executed asynchronously.

A shared_task in Celery is a task that can be executed by multiple workers at the same time. When you call a shared_task, Celery will automatically distribute the task to multiple workers. This will ensure that the task is executed as quickly as possible.

Start the Celery Worker

Open a new terminal or command prompt and start the Celery worker using the celery command:

celery -A your_project_name.celery worker --loglevel=info

Replace 'your_project_name' with the name of your Django project.

If your setup was correct this should not return any error. 

Using Celery Async tasks in Views

You can now use your Celery async task from any part of your Django app, such as views or other functions:

# your_app_name/views.py
from django.http import JsonResponse
from .tasks import your_async_task

def your_view(request):
    # Call the asynchronous task using delay() method
    task_result = your_async_task.delay(10, 20)
    return JsonResponse({'task_id': task_result.id})

The delay() method is provided by Celery and allows you to call the async task just like a regular Python function, but it will be executed asynchronously by a Celery worker.

Unit Testing Async Tasks 

There are many strategies to unit test a celery task. The one that I use the most is running the task synchronously as a normal function call during testing. 

Celery provides a built-in method called apply() that allows you to call the task synchronously and get the result immediately.

Here's how you can do it:

# your_app_name/tests.py
from django.test import TestCase
from .tasks import your_async_task


class YourAsyncTaskTestCase(TestCase):
    def test_your_async_task(self):
        # Call the Celery shared task synchronously using apply()
        result = your_async_task.apply(args=(10, 20)).get()

        # Assertions to check the task result
        self.assertEqual(result, 30)

By using apply() with .get(), the Celery shared task will be executed synchronously during testing, and you can directly check its result.

This allows you to treat the shared task as a normal function call for testing purposes, without the need for additional mocking.

CELERY_TASK_ALWAYS_EAGER

This setting determines whether Celery will run tasks synchronously (eagerly) or asynchronously. When set to True, Celery will execute tasks immediately as normal function calls rather than queuing them in the broker and processing them asynchronously.

This setting is mainly used for testing and development to simplify debugging and avoid the need for a separate message broker during local testing.

For example, in your Django settings:

# settings.py
CELERY_TASK_ALWAYS_EAGER = True  # Run tasks synchronously for testing and development

Keep in mind that you should set CELERY_TASK_ALWAYS_EAGER to False in production or when you want to utilize Celery's asynchronous task processing capabilities.

Periodic Tasks With Celery

To schedule Celery periodic tasks, you need to install and use Celery Beat, which is an additional component of Celery designed specifically for handling periodic tasks.

Celery Beat is responsible for managing the periodic task schedule and triggering task execution at the specified intervals. It uses a persistent database (e.g., Django's database or Redis) to store the schedule information.

Creating a periodic task

Define the Celery Task In any Django app, create a tasks.py file (if not already created) and define your Celery task using the @app.task decorator:

from your_project_name.celery import app

@app.task
def your_periodic_task():
    # Your periodic task logic here
    # This task will run at the specified interval
    print("This is a periodic task.")

In your Django settings (settings.py), define the schedule for your_periodic_task using the CELERY_BEAT_SCHEDULE setting:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'your_periodic_task_name': {
        'task': 'your_app_name.tasks.your_periodic_task',
        'schedule': crontab(minute='*/1'),  # Schedule the task to run every minute
    },
}

Start Celery Beat

Start the Celery Beat scheduler to activate the periodic task:

celery -A your_project_name.celery beat --loglevel=info

Start Celery Worker

To execute the periodic task, start the Celery worker:

celery -A your_project_name worker --loglevel=info

Replace 'your_project_name' with the name of your Django project.

That's it!

Now your_periodic_task will run every minute as scheduled by Celery Beat. The Celery worker will execute the task accordingly.

Remember to keep both the Celery worker and Celery Beat scheduler running simultaneously for the periodic task to work correctly.

Using django-celery-beat

Using django-celery-beat is optional, you may choose to use it if your project requires periodic task scheduling and management capabilities from the Admin panel.

This extension enables the user to store periodic tasks in a Django database and manage the tasks using the Django Admin interface.

Use the following steps to install django-celery-beat in your project project.

Using pip:

pip install django-celery-beat

Using poetry:

poetry add django-celery-beat

Next, Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

Apply Django database migrations so that the necessary tables are created:

python manage.py migrate

Start the celery beat service using the django_celery_beat.schedulers:DatabaseScheduler scheduler:

celery -A your_project_name beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

Unit Testing Periodic Tasks

To unit test the periodic task, you can apply the same approach we used for testing asynchronous tasks by running them synchronously.


DJANGO

Latest Articles

Latest from djangocentral

How to Use Subquery() in Django With Practical Examples

In the realm of web development, Django stands as a powerful and versatile framework for building robust applications. One of the key aspects of developing efficient and optimized web applications is handling database queries effectively. In this article…
Read more →

4 min read

DRF Serializer: Handling OrderedDict and Converting It to a Dictionary or JSON

In Django Rest Framework (DRF) tests, when you access serializer.data, you might encounter an OrderedDict instead of a regular dictionary. This behavior is intentional and reflects the design of DRF's serialization process.Understanding the Problem The u…
Read more →

3 min read

Django Rest Framework CheetSheet: Mastering API Development

Django Rest Framework (DRF) is a powerful toolkit that makes building robust and scalable web APIs with Django a breeze. Whether you're a seasoned Django developer or a newcomer, having a comprehensive cheat sheet at your disposal can be a game-changer. …
Read more →

5 min read

How to Perform NOT Queries in Django ORM

In Django, performing NOT queries allows you to exclude certain records from the query results based on specific conditions. The NOT operator, represented by the tilde (~) when used in conjunction with the Django ORM's Q object, helps you construct compl…
Read more →

3 min read