FastAPI Background Tasks with Celery (2026)

FastAPI Background Tasks with Celery Integration Guide

Building modern web applications often requires handling time-consuming operations without blocking the main request-response cycle. Whether you’re processing large datasets, sending emails, or generating reports, background tasks are essential for maintaining responsive user experiences. This guide explores how to integrate Celery with FastAPI for solid asynchronous task processing, with practical examples you can apply immediately.

Why Background Tasks Matter in FastAPI

FastAPI’s async capabilities handle I/O-bound operations efficiently, but CPU-intensive tasks still block the event loop. When your endpoint needs to process data that takes seconds or minutes, users face frustrating wait times or timeouts. Background tasks solve this by moving heavy processing to separate workers that run independently of HTTP requests.

Common use cases include:

  • Image/video processing: Resizing, transcoding, or applying filters
  • Data exports: Generating large CSV or Excel files
  • Email notifications: Sending welcome emails, password resets, or newsletters
  • Web scraping: Collecting data from external sources
  • Machine learning inference: Running model predictions asynchronously

Celery provides a battle-tested solution for managing these distributed task queues, with support for task scheduling, retries, and result persistence.

Setting Up Celery with FastAPI

Prerequisites and Installation

First, install the required dependencies:

pip install fastapi celery redis uvicorn

You’ll also need Redis as the message broker (though Celery supports other brokers like RabbitMQ):

Using Docker
docker run -d -p 6379:6379 redis:7-alpine

Project Structure

A well-organized structure separates your FastAPI app from Celery configuration:

project/
 app/
 __init__.py
 main.py # FastAPI application
 tasks.py # Celery tasks
 config.py # Configuration
 celery_app/
 celery_config.py # Celery configuration
 requirements.txt

Basic Celery Configuration

Create your Celery configuration file:

celery_app/celery_config.py
from celery import Celery
celery_app = Celery(
 "worker",
 broker="redis://localhost:6379/0",
 backend="redis://localhost:6379/0"
)
celery_app.conf.update(
 task_serializer="json",
 accept_content=["json"],
 result_serializer="json",
 timezone="UTC",
 enable_utc=True,
 task_track_started=True,
 task_time_limit=3600, # 1 hour max
 task_soft_time_limit=3000, # Soft limit at 50 minutes
)

Creating the FastAPI Application

Now build your FastAPI app with task endpoints:

app/main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI(title="FastAPI Celery Integration")
class TaskRequest(BaseModel):
 user_id: int
 email: str
 action: str
Import tasks after they're defined
from app.tasks import process_user_task, send_notification
@app.post("/tasks/submit")
async def submit_task(request: TaskRequest, background_tasks: BackgroundTasks):
 """Submit a task for background processing."""
 
 # Queue the task with Celery
 task = process_user_task.delay(
 user_id=request.user_id,
 email=request.email,
 action=request.action
 )
 
 return {
 "task_id": task.id,
 "status": "submitted",
 "message": "Task queued successfully"
 }
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
 """Check the status of a background task."""
 from celery_app.celery_config import celery_app
 
 result = celery_app.AsyncResult(task_id)
 
 return {
 "task_id": task_id,
 "status": result.status,
 "result": result.result if result.ready() else None
 }

Defining Celery Tasks

Create your task definitions:

app/tasks.py
from celery_app.celery_config import celery_app
import time
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3)
def process_user_task(self, user_id: int, email: str, action: str):
 """Process a user action in the background."""
 try:
 logger.info(f"Processing task for user {user_id}, action: {action}")
 
 # Simulate processing time
 time.sleep(5)
 
 # Your business logic here
 result = {
 "user_id": user_id,
 "email": email,
 "action": action,
 "processed": True,
 "timestamp": time.time()
 }
 
 return result
 
 except Exception as exc:
 logger.error(f"Task failed: {exc}")
 # Retry with exponential backoff
 raise self.retry(exc=exc, countdown=60 * (2 self.request.retries))
@celery_app.task
def send_notification(user_id: int, message: str):
 """Send notification email (mock implementation)."""
 # In production, integrate with SendGrid, AWS SES, etc.
 logger.info(f"Sending notification to user {user_id}: {message}")
 return {"sent": True, "user_id": user_id}

FastAPI BackgroundTasks vs Celery

FastAPI includes a built-in BackgroundTasks class for simple use cases. Understanding when to use each option helps you choose the right tool:

Feature FastAPI BackgroundTasks Celery
Complexity Low High
Scalability Single process Distributed workers
Persistence In-memory Redis/database
Scheduling No Yes (celery beat)
Retry logic Manual Built-in
Monitoring Limited Flower dashboard

Use FastAPI’s BackgroundTasks for simple, single-instance applications. Choose Celery when you need horizontal scaling, persistent queues, scheduled tasks, or solid retry mechanisms.

Advanced Patterns

Task Chaining and Chords

Chain multiple tasks together for complex workflows:

from celery import chain, chord
Sequential execution
workflow = chain(
 process_user_task.s(user_id=1, email="[email protected]", action="signup"),
 send_notification.s(message="Welcome!")
)
result = workflow.apply_async()
Parallel execution with callback
workflow = chord(
 [process_user_task.s(user_id=i, email=f"user{i}@example.com", action="batch") 
 for i in range(10)],
 send_notification.s(message="Batch processing complete")
)
result = workflow.apply_async()

Scheduled Tasks with Celery Beat

For recurring tasks, integrate Celery Beat:

celery_app/celery_config.py additions
celery_app.conf.beat_schedule = {
 "cleanup-old-data-daily": {
 "task": "app.tasks.cleanup_old_data",
 "schedule": 86400.00, # Daily
 },
 "generate-reports-hourly": {
 "task": "app.tasks.generate_daily_report",
 "schedule": 3600.00, # Hourly
 },
}

Error Handling and Monitoring

Implement comprehensive error handling:

@celery_app.task(bind=True, max_retries=3, autoretry_for=(Exception,), 
 retry_backoff=True, retry_backoff_max=600)
def robust_task(self, data: dict):
 """Task with automatic retry logic."""
 try:
 # Process data
 result = process_data_safely(data)
 return {"status": "success", "result": result}
 except ValidationError as exc:
 # Don't retry validation errors
 return {"status": "failed", "error": str(exc)}

Best Practices

  1. Keep tasks idempotent: Design tasks that can run safely multiple times without side effects

  2. Use descriptive task names: Enable easy identification in monitoring tools

  3. Implement proper logging: Track task execution for debugging

  4. Set appropriate timeouts: Prevent runaway tasks from consuming resources

  5. Use task routing: Route specific tasks to dedicated queues based on resource needs

  6. Monitor with Flower: Install Flower for real-time Celery cluster monitoring

pip install flower
celery -A celery_app.celery_config flower --port=5555

Common Pitfalls to Avoid

  • Blocking the event loop: Never use synchronous I/O within Celery tasks; it’s designed for separate processes

  • Forgetting task serialization: Ensure all task arguments are JSON-serializable

  • Ignoring result backend: Set up a result backend to track task status and retrieve results

  • Resource exhaustion: Configure prefetch limits to prevent worker overload

  • Missing error handling: Always wrap task logic in try-except blocks

Running the Application

Start the components separately:

Terminal 1: Redis
redis-server
Terminal 2: Celery worker
celery -A celery_app.celery_config worker --loglevel=info
Terminal 3: Celery Beat (if using scheduled tasks)
celery -A celery_app.celery_config beat --loglevel=info
Terminal 4: FastAPI
uvicorn app.main:app --reload

Conclusion

Integrating Celery with FastAPI provides a powerful foundation for handling background processing at scale. Start with simple tasks and progressively adopt advanced patterns like task chaining, scheduling, and comprehensive monitoring as your needs grow. The combination of FastAPI’s async capabilities and Celery’s solid task queue management enables you to build responsive applications that handle demanding workloads efficiently.


Try it: Paste your error into our Error Diagnostic for an instant fix.

I'm a solo developer in Vietnam. 50K Chrome extension users. $500K+ on Upwork. 5 Claude Max subscriptions running agent fleets in parallel. These are my actual CLAUDE.md templates, orchestration configs, and prompts. Not a course. Not theory. The files I copy into every project before I write a line of code. **[See what's inside →](https://zovo.one/lifetime?utm_source=ccg&utm_medium=cta-default&utm_campaign=claude-code-fastapi-background-tasks-celery-integration)** $99 once. Free forever. 47/500 founding spots left.

Related Reading

Built by theluckystrike. More at zovo.one

Find the right skill → Browse 155+ skills in our Skill Finder.

Configure MCP → Build your server config with our MCP Config Generator.