Django Celery: Asynchronous Task Queue and Background Jobs

Celery transforms Django applications from synchronous request-response cycles into asynchronous systems executing background tasks independently maintaining fast response times while processing heavy workloads asynchronously. Without Celery, long-running operations like sending emails, processing images, generating reports, or calling external APIs block requests causing timeouts and poor user experiences as users wait for tasks completing. Celery decouples task execution from web requests queuing tasks for workers processing independently allowing applications to respond instantly while background processes handle time-consuming operations. Real-world use cases include sending bulk emails without blocking checkout, processing uploaded images asynchronously, generating PDF reports in background, periodically cleaning expired sessions, calling third-party payment APIs with retries, and aggregating analytics data on schedules. Celery architecture consists of task producers creating work, message brokers like Redis or RabbitMQ queuing tasks, workers executing tasks concurrently, and result backends storing task outcomes enabling distributed processing across multiple servers. Understanding Celery fundamentals integrated with Django project structure enables building scalable asynchronous systems serving millions of users. This comprehensive guide explores Celery with Django including understanding asynchronous task concepts and benefits, installing and configuring Celery with Redis broker, creating tasks with @shared_task decorator, calling tasks asynchronously with delay() and apply_async(), implementing periodic tasks with Celery Beat scheduler, handling task results and error handling with retries, monitoring tasks through Flower dashboard, implementing task priorities and routing, optimizing worker configuration, and best practices for reliable background job processing throughout Django development from simple email sending through complex distributed workflows integrated with business logic.
Celery Installation and Configuration
Celery setup requires installing celery package, configuring message broker for task queuing, creating Celery app instance, and starting worker processes executing tasks. Redis commonly serves as broker and result backend offering simplicity and performance while RabbitMQ provides advanced features for complex routing. Understanding Celery configuration integrated with Django settings enables proper task queue setup maintaining reliability.
# Install Celery and Redis
pip install celery redis
# Install Redis server
# Ubuntu/Debian
sudo apt install redis-server
sudo systemctl start redis
# macOS
brew install redis
brew services start redis
# Celery configuration
# myproject/celery.py
import os
from celery import Celery
# Set Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Create Celery app
app = Celery('myproject')
# Load config from Django settings with CELERY namespace
app.config_from_object('django.conf:settings', namespace='CELERY')
# Auto-discover tasks in all installed apps
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# myproject/__init__.py
# Ensure Celery app is imported when Django starts
from .celery import app as celery_app
__all__ = ('celery_app',)
# Django settings.py - Celery Configuration
# Celery broker (Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Result backend (Redis)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Task serialization
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# Timezone
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Task results expiration
CELERY_RESULT_EXPIRES = 3600 # 1 hour
# Task routing
CELERY_TASK_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.process_image': {'queue': 'images'},
}
# Worker configuration
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# Task execution limits
CELERY_TASK_TIME_LIMIT = 300 # 5 minutes hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 4 minutes soft limit
# Start Celery worker
celery -A myproject worker -l info
# Start worker with specific queue
celery -A myproject worker -Q emails -l info
# Start worker with concurrency
celery -A myproject worker --concurrency=4 -l info
# Start worker in background (daemon)
celery -A myproject worker --detach --pidfile=/var/run/celery/celery.pid
# Start multiple workers
celery multi start worker1 -A myproject -l info
celery multi start worker2 -A myproject -Q emails -l info
celery multi stopwait worker1 worker2Creating and Calling Tasks
Celery tasks are Python functions decorated with @shared_task enabling asynchronous execution through worker processes. Tasks define in tasks.py files within Django apps discovered automatically through autodiscover_tasks(). Calling tasks uses delay() for simple execution or apply_async() for advanced options like countdown, eta, and expires controlling task scheduling. Understanding task creation and invocation integrated with Django views enables offloading heavy operations maintaining fast response times.
# Creating Celery Tasks
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from .models import Article, Report
import time
# Basic task
@shared_task
def send_email_task(subject, message, recipient):
"""
Send email asynchronously
"""
send_mail(
subject=subject,
message=message,
from_email='[email protected]',
recipient_list=[recipient],
)
return f"Email sent to {recipient}"
# Task with retry
@shared_task(bind=True, max_retries=3)
def send_notification(self, user_id, message):
"""
Send notification with automatic retry on failure
"""
try:
user = User.objects.get(id=user_id)
# External API call
send_push_notification(user.phone, message)
return f"Notification sent to {user.username}"
except Exception as exc:
# Retry after 60 seconds
raise self.retry(exc=exc, countdown=60)
# Task with custom error handling
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def process_payment(self, order_id, amount):
"""
Process payment with exponential backoff retry
"""
from .models import Order
order = Order.objects.get(id=order_id)
# Call payment gateway
result = payment_gateway.charge(order.user, amount)
if result.success:
order.status = 'paid'
order.save()
return f"Payment processed for order {order_id}"
else:
raise PaymentError(f"Payment failed: {result.error}")
# Long-running task
@shared_task
def generate_report(report_id):
"""
Generate PDF report in background
"""
report = Report.objects.get(id=report_id)
report.status = 'processing'
report.save()
try:
# Heavy computation
data = collect_report_data(report)
pdf_content = generate_pdf(data)
# Save result
report.file.save(f'report_{report_id}.pdf', pdf_content)
report.status = 'completed'
report.save()
return f"Report {report_id} generated"
except Exception as e:
report.status = 'failed'
report.error = str(e)
report.save()
raise
# Task with progress tracking
@shared_task(bind=True)
def process_bulk_upload(self, file_path):
"""
Process large file with progress updates
"""
import csv
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
total = len(rows)
for i, row in enumerate(rows):
# Process row
process_row(row)
# Update progress
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': total}
)
return {'status': 'completed', 'total': total}
# Calling tasks
# views.py
from django.shortcuts import render, redirect
from .tasks import send_email_task, generate_report, process_bulk_upload
def contact_view(request):
if request.method == 'POST':
email = request.POST['email']
message = request.POST['message']
# Call task asynchronously with delay()
send_email_task.delay(
subject='Contact Form',
message=message,
recipient=email
)
return render(request, 'success.html')
return render(request, 'contact.html')
def create_report_view(request):
report = Report.objects.create(user=request.user)
# Call task and get result
result = generate_report.delay(report.id)
# Store task ID for status checking
request.session['report_task_id'] = result.id
return redirect('report_status', report_id=report.id)
# Advanced task calling with apply_async()
def schedule_task_view(request):
from datetime import timedelta
from django.utils import timezone
# Execute after 10 seconds
send_email_task.apply_async(
args=['Subject', 'Message', '[email protected]'],
countdown=10
)
# Execute at specific time
eta = timezone.now() + timedelta(hours=1)
send_email_task.apply_async(
args=['Subject', 'Message', '[email protected]'],
eta=eta
)
# Execute with expiration
send_email_task.apply_async(
args=['Subject', 'Message', '[email protected]'],
expires=60 # Expire after 60 seconds
)
# Execute on specific queue
send_email_task.apply_async(
args=['Subject', 'Message', '[email protected]'],
queue='emails'
)
# Execute with priority
send_email_task.apply_async(
args=['Subject', 'Message', '[email protected]'],
priority=9 # 0 (lowest) to 9 (highest)
)
return render(request, 'scheduled.html')
# Check task status
from celery.result import AsyncResult
def task_status_view(request, task_id):
result = AsyncResult(task_id)
if result.ready():
# Task completed
if result.successful():
output = result.get()
return render(request, 'success.html', {'output': output})
else:
# Task failed
return render(request, 'error.html', {'error': result.info})
else:
# Task still running
return render(request, 'pending.html', {'state': result.state})Periodic Tasks with Celery Beat
Celery Beat schedules periodic tasks executing at regular intervals like cron jobs running maintenance operations, sending scheduled emails, cleaning old data, or aggregating metrics. Beat scheduler stores schedules in Django database through django-celery-beat enabling dynamic schedule management through admin interface. Understanding periodic task configuration enables automating routine operations maintaining application health without manual intervention.
# Periodic Tasks with Celery Beat
# Install django-celery-beat
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
'django_celery_beat',
]
# Run migrations
python manage.py migrate django_celery_beat
# Define periodic tasks
# myapp/tasks.py
from celery import shared_task
from django.utils import timezone
from datetime import timedelta
@shared_task
def cleanup_old_sessions():
"""
Delete expired sessions daily
"""
from django.contrib.sessions.models import Session
Session.objects.filter(expire_date__lt=timezone.now()).delete()
return "Old sessions cleaned"
@shared_task
def send_daily_digest():
"""
Send daily email digest to users
"""
from django.contrib.auth.models import User
from .models import Article
yesterday = timezone.now() - timedelta(days=1)
articles = Article.objects.filter(created_at__gte=yesterday)
for user in User.objects.filter(email_notifications=True):
send_digest_email(user, articles)
return f"Digest sent to {User.objects.count()} users"
@shared_task
def aggregate_daily_stats():
"""
Calculate daily statistics
"""
from .models import DailyStats, Article
today = timezone.now().date()
stats = DailyStats.objects.create(
date=today,
articles_published=Article.objects.filter(
created_at__date=today
).count()
)
return f"Stats aggregated for {today}"
# Configure schedules in settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# Execute every 30 seconds
'test-every-30-seconds': {
'task': 'myapp.tasks.debug_task',
'schedule': 30.0,
},
# Execute every minute
'cleanup-every-minute': {
'task': 'myapp.tasks.cleanup_temp_files',
'schedule': crontab(minute='*'),
},
# Execute daily at midnight
'daily-cleanup': {
'task': 'myapp.tasks.cleanup_old_sessions',
'schedule': crontab(hour=0, minute=0),
},
# Execute every Monday at 8 AM
'weekly-report': {
'task': 'myapp.tasks.send_weekly_report',
'schedule': crontab(hour=8, minute=0, day_of_week=1),
},
# Execute on 1st of every month
'monthly-billing': {
'task': 'myapp.tasks.process_monthly_billing',
'schedule': crontab(hour=0, minute=0, day_of_month=1),
},
# Execute every hour
'hourly-sync': {
'task': 'myapp.tasks.sync_external_data',
'schedule': crontab(minute=0),
},
}
# Start Celery Beat scheduler
celery -A myproject beat -l info
# Or with scheduler storing schedule in database
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Run worker and beat together
celery -A myproject worker --beat -l info
# Manage schedules through Django admin
# Admin will show Periodic Tasks, Intervals, Crontabs
# Dynamically create periodic tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
def create_periodic_task():
# Create schedule
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='*/2', # Every 2 hours
day_of_week='*',
day_of_month='*',
month_of_year='*',
)
# Create periodic task
PeriodicTask.objects.create(
crontab=schedule,
name='Sync data every 2 hours',
task='myapp.tasks.sync_data',
args=json.dumps([]),
kwargs=json.dumps({}),
)
# One-time scheduled tasks
from datetime import timedelta
from django_celery_beat.models import PeriodicTask, ClockedSchedule
def schedule_one_time_task():
# Create clocked schedule
schedule_time = timezone.now() + timedelta(hours=24)
schedule = ClockedSchedule.objects.create(
clocked_time=schedule_time
)
# Create one-time task
PeriodicTask.objects.create(
clocked=schedule,
name='Send reminder in 24 hours',
task='myapp.tasks.send_reminder',
one_off=True, # Task runs only once
args=json.dumps([user.id]),
)| Task Option | Purpose | Example | Use Case |
|---|---|---|---|
| bind=True | Access task instance (self) | @shared_task(bind=True) | Retry logic, progress updates |
| max_retries | Maximum retry attempts | max_retries=3 | API calls with failures |
| autoretry_for | Auto-retry on exceptions | autoretry_for=(Exception,) | Network errors |
| retry_backoff | Exponential backoff | retry_backoff=True | Rate-limited APIs |
| time_limit | Hard execution timeout | time_limit=300 | Prevent hung tasks |
Task Monitoring and Management
Monitoring Celery tasks through Flower web interface provides real-time visibility into task execution, worker status, and queue depths enabling performance optimization and troubleshooting. Understanding monitoring integrated with production deployment maintains reliable background processing serving business needs.
# Install Flower for monitoring
pip install flower
# Start Flower
celery -A myproject flower
# Access Flower web interface
# http://localhost:5555
# Flower with authentication
celery -A myproject flower --basic_auth=admin:password
# Flower configuration in settings.py
FLOWER_BASIC_AUTH = ['admin:password', 'user:userpass']
FLOWER_PORT = 5555
# Task result inspection
from celery.result import AsyncResult
def check_task_status(task_id):
result = AsyncResult(task_id)
return {
'state': result.state,
'ready': result.ready(),
'successful': result.successful(),
'failed': result.failed(),
'info': result.info
}
# Revoke tasks
def cancel_task(task_id):
from celery import current_app
current_app.control.revoke(task_id, terminate=True)
# Inspect active tasks
from celery import current_app
def get_active_tasks():
inspect = current_app.control.inspect()
return inspect.active()
# Get worker statistics
def get_worker_stats():
inspect = current_app.control.inspect()
return {
'active': inspect.active(),
'scheduled': inspect.scheduled(),
'reserved': inspect.reserved(),
'stats': inspect.stats(),
}Celery Best Practices
- Keep tasks idempotent: Ensure tasks can run multiple times safely producing same result preventing data corruption on retries
- Use timeouts: Set time_limit and soft_time_limit preventing hung tasks consuming worker resources indefinitely
- Implement retries: Configure max_retries and exponential backoff handling transient failures gracefully
- Avoid passing large objects: Pass IDs instead of model instances reducing message sizes and preventing serialization issues
- Use separate queues: Route tasks to dedicated queues prioritizing critical tasks maintaining performance
- Monitor task execution: Use Flower tracking task performance, failures, and worker health identifying bottlenecks
- Handle failures gracefully: Log errors, notify admins, and provide fallback mechanisms maintaining user experience
- Test tasks thoroughly: Write tests for task logic using Django testing mocking external dependencies
- Use task signatures: Compose complex workflows with chains, groups, and chords creating sophisticated pipelines
- Scale workers appropriately: Match worker count to workload and server resources preventing overload or underutilization
Conclusion
Celery transforms Django applications from synchronous request-response patterns into asynchronous systems executing background tasks independently maintaining fast response times while processing heavy workloads through distributed worker processes. Celery architecture consists of task producers creating work in Django views, message brokers like Redis queuing tasks for distribution, worker processes executing tasks concurrently across multiple servers, and result backends storing task outcomes enabling status checking and result retrieval. Installation and configuration requires celery package with Redis broker, creating Celery app instance in project root importing Django settings, and autodiscover_tasks() finding task definitions across installed apps registering them automatically. Tasks define as Python functions decorated with @shared_task accepting arguments serialized to JSON enabling execution in separate processes with delay() for simple calls or apply_async() for advanced options like countdown, eta, queue routing, and priority setting. Task options including bind=True for accessing task instance enabling retry logic, max_retries limiting retry attempts, autoretry_for automatically retrying on specified exceptions, retry_backoff implementing exponential backoff, and time_limit preventing hung tasks configure behavior handling failures gracefully. Periodic tasks through Celery Beat scheduler execute at regular intervals configured through CELERY_BEAT_SCHEDULE with crontab schedules or django-celery-beat storing schedules in database enabling dynamic management through Django admin creating maintenance jobs, daily digests, and scheduled cleanups. Monitoring through Flower web interface provides real-time visibility into task execution showing active tasks, worker status, success rates, and execution times enabling performance optimization and troubleshooting identifying bottlenecks. Best practices include keeping tasks idempotent ensuring safe retries, using timeouts preventing resource exhaustion, implementing retry strategies handling transient failures, passing IDs instead of large objects reducing message sizes, routing tasks to separate queues prioritizing critical operations, monitoring execution tracking performance and failures, handling errors gracefully logging issues and notifying admins, testing tasks thoroughly using Django test framework, composing workflows with signatures creating complex pipelines, and scaling workers matching workload and resources. Understanding Celery fundamentals from task creation through worker management integrated with Django views, signals, and production deployment enables building scalable asynchronous systems processing millions of background jobs serving diverse use cases from email sending through image processing maintaining application responsiveness throughout Django development lifecycle.
$ share --platform
$ cat /comments/ (0)
$ cat /comments/
// No comments found. Be the first!


