Claude Code for Celery Chord Workflow (2026)
Setting up celery chord correctly requires understanding proper celery chord configuration, integration testing, and ongoing maintenance. Below, you will find the Claude Code workflow for celery chord that handles each of these concerns step by step. We cover this further in How to Use gRPC Testing with Claude Code: grpcurl (2026).
Celery chord workflows are one of the most powerful patterns in distributed task processing, allowing you to execute a group of tasks in parallel and then run a final callback when all tasks complete. However, building and debugging these workflows can be challenging. This tutorial shows you how to use Claude Code to build, test, and optimize Celery chord workflows efficiently.
Understanding Celery Chords
A chord is essentially a callback that’s executed after all tasks in a group finish. This pattern is incredibly useful when you need to:
- Process multiple items in parallel, then aggregate results
- Execute a workflow where each step depends on the completion of multiple prior tasks
- Build complex pipelines that require synchronization points
The basic structure involves a header (the group of tasks to run) and a body (the callback that runs after completion).
Before exploring more complex patterns, it helps to understand where chords fit among Celery’s workflow primitives:
| Primitive | Purpose | When to Use |
|---|---|---|
group |
Run tasks in parallel, collect individual results | Parallel fan-out with no post-processing |
chain |
Run tasks sequentially, passing output forward | Step-by-step pipeline |
chord |
Run a group in parallel, then run a callback | Parallel fan-out with aggregation |
chunks |
Split an iterable into parallel sub-batches | High-volume data processing |
Chords are the right choice whenever you need a “fan-out then fan-in” shape: launch many workers at once, wait for all of them, then do something with the combined results.
Setting Up Your Celery Project
Before diving into chords, ensure you have a proper Celery setup. Claude Code can help you scaffold this quickly. A production-ready Celery project separates concerns across a few files:
myproject/
celeryconfig.py # Broker, backend, and worker settings
tasks.py # Task definitions
workflows.py # Chord and pipeline constructors
tests/
test_workflows.py
Here is a minimal but complete celeryconfig.py for local development using Redis:
celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
Required for chords to work correctly
result_extended = True
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
Prevent chord hang on task failure (Celery 5+)
chord_propagates = True
The result_extended = True and chord_propagates = True settings are easy to miss and cause hard-to-diagnose problems when absent. Claude Code will include them if you ask it to generate a production-ready config.
Here is the core tasks file:
tasks.py
from celery import Celery, chord, group
app = Celery('workflows')
app.config_from_object('celeryconfig')
@app.task
def process_item(item):
"""Process a single item."""
result = item * 2
return {'item': item, 'result': result}
@app.task
def aggregate_results(results):
"""Aggregate all results from the chord header."""
total = sum(r['result'] for r in results)
return {'total': total, 'count': len(results)}
The chord combines a group of process_item tasks with an aggregate_results callback that runs after all items complete.
Building Your First Chord Workflow
Creating a chord workflow is straightforward with Celery. Here’s how to structure it:
Building and executing a chord
def run_workflow(items):
# Create a group of tasks
task_group = group(process_item.s(item) for item in items)
# Create the chord: group + callback
workflow = chord(task_group)(aggregate_results.s())
return workflow.apply_async()
The .s() notation creates a signature (a lazy task), and the chord automatically handles the coordination. Each process_item task runs in parallel, and once all complete, aggregate_results receives the list of results.
One important subtlety: when you call chord(header)(callback), the callback is invoked with a single argument, a list of all return values from the header tasks, in the order the tasks were defined (not the order they completed). If a header task returns None, that None still appears in the list. Write your callback to handle None values defensively.
Using Claude Code to Generate Chord Workflows
Claude Code excels at generating boilerplate and explaining complex patterns. You can ask Claude to create a chord workflow for your specific use case. For example:
“Create a Celery chord workflow that processes user uploads in parallel and generates a summary report when all uploads are processed.”
Claude will generate appropriate code, including error handling and retry strategies:
from celery import chord, group
from celery.exceptions import MaxRetriesExceededError
@app.task(bind=True, max_retries=3)
def process_upload(self, upload_id):
"""Process a single file upload."""
try:
# Your processing logic here
return {'upload_id': upload_id, 'status': 'success'}
except Exception as exc:
# Retry with exponential backoff
try:
self.retry(exc=exc, countdown=2 self.request.retries)
except MaxRetriesExceededError:
return {'upload_id': upload_id, 'status': 'failed'}
@app.task
def generate_summary(results):
"""Generate summary after all uploads processed."""
successful = [r for r in results if r and r.get('status') == 'success']
return {
'total': len(results),
'successful': len(successful),
'failed': len(results) - len(successful)
}
def process_all_uploads(upload_ids):
"""Execute the chord workflow."""
workflow = chord(
group(process_upload.s(upload_id) for upload_id in upload_ids)
)(generate_summary.s())
return workflow.apply_async()
When asking Claude Code to generate this kind of pattern, be explicit about your failure strategy. Saying “if any task fails, the callback should still run with partial results” produces different code than “if any task fails, abort the chord.” Claude handles both, but you need to tell it which behavior you want. See also Generate GraphQL Schemas with Claude Code for more on this topic.
Practical Example: Image Processing Pipeline
A real-world scenario that maps perfectly to chords is batch image processing, resize and watermark a set of uploaded images in parallel, then notify the user when all are ready.
import boto3
from celery import chord, group
from PIL import Image
import io
s3 = boto3.client('s3')
@app.task(bind=True, max_retries=2, soft_time_limit=30)
def resize_and_watermark(self, bucket, key, user_id):
"""Download, resize, watermark, and re-upload one image."""
try:
obj = s3.get_object(Bucket=bucket, Key=key)
img = Image.open(io.BytesIO(obj['Body'].read()))
# Resize to max 1200px wide
img.thumbnail((1200, 1200))
# Simple watermark using text overlay (placeholder)
output = io.BytesIO()
img.save(output, format='JPEG', quality=85)
output.seek(0)
dest_key = key.replace('uploads/', 'processed/')
s3.put_object(Bucket=bucket, Key=dest_key, Body=output)
return {'key': dest_key, 'status': 'ok'}
except Exception as exc:
self.retry(exc=exc, countdown=5)
@app.task
def notify_user(results, user_id):
"""Send notification once all images are ready."""
successful = [r for r in results if r and r['status'] == 'ok']
failed_count = len(results) - len(successful)
# send_email(user_id, successful, failed_count) # your notification logic
return {
'user_id': user_id,
'processed': len(successful),
'failed': failed_count
}
def process_user_upload_batch(bucket, keys, user_id):
"""Kick off chord for a batch of uploaded images."""
header = group(resize_and_watermark.s(bucket, k, user_id) for k in keys)
callback = notify_user.s(user_id)
return chord(header)(callback).apply_async()
Notice that notify_user takes user_id as a second argument, it does not come from the chord results list. Passing extra arguments to the callback is done by including them in .s() when defining the callback: notify_user.s(user_id). Celery prepends the results list automatically.
Advanced Patterns: Chords with Headers
Celery 4.2+ introduced immutable header arguments, allowing you to pass shared data to all tasks in the chord without repeating it in every signature:
@app.task
def process_with_context(item, context):
"""Task receives both item and shared context."""
return {'item': item, 'context': context, 'processed': True}
@app.task
def finalize(results, context):
"""Final callback receives all results plus context."""
return {'context': context, 'results': results}
def run_workflow_with_context(items, context):
"""Execute chord with shared header data."""
workflow = chord(
header=group(process_with_context.s(item, context) for item in items),
callback=finalize.s(context)
)
return workflow.apply_async()
This pattern is useful when all parallel tasks need access to shared configuration or metadata such as a request ID, tenant ID, or a set of feature flags. Embedding the context directly in each signature keeps the workflow self-contained and avoids hitting a shared store on every task start.
Nested Chords and Multi-Stage Pipelines
For complex pipelines, you can nest chords inside chains. This creates a multi-stage workflow where each stage fans out then fans in before proceeding:
from celery import chain, chord, group
@app.task
def fetch_data(source_id):
"""Stage 1: fetch raw data from a source."""
return {'source_id': source_id, 'records': [1, 2, 3]} # example
@app.task
def enrich_record(record, source_id):
"""Stage 2: enrich a single record."""
return {'record': record, 'enriched': True, 'source_id': source_id}
@app.task
def merge_enriched(results):
"""Stage 2 callback: merge enriched records."""
return {'merged': results}
@app.task
def write_to_db(merged_data, job_id):
"""Stage 3: persist merged data."""
# db.insert(merged_data, job_id)
return {'job_id': job_id, 'rows_written': len(merged_data['merged'])}
def full_pipeline(source_ids, job_id):
"""
Stage 1: fetch all sources in parallel
Stage 2: enrich every record in parallel, merge
Stage 3: write final merged data
"""
# Build stage-1 chord: fetch all sources, collect into a list
stage1 = chord(
group(fetch_data.s(sid) for sid in source_ids)
)
# Stage 2 runs after stage 1. but it receives a list of fetch results
# We need an intermediate task to re-fan-out
@app.task
def stage2_dispatch(fetch_results):
all_records = [
(r['records'][i], r['source_id'])
for r in fetch_results
for i in range(len(r['records']))
]
inner = chord(
group(enrich_record.s(rec, sid) for rec, sid in all_records)
)(merge_enriched.s())
return inner
return chain(
stage1(stage2_dispatch.s()),
write_to_db.s(job_id)
).apply_async()
Nested chords require care: each chord() call returns an AsyncResult, and chaining off it works because Celery unwraps the result. Ask Claude Code to draw out the dependency graph before writing nested chord code, it will explain the execution order clearly and point out where results flow.
Debugging Chord Workflows
Chords can be tricky to debug. Here are practical strategies:
- Enable chord synchronization: Use the
chord_propagates = Truesetting to handle errors properly - Log intermediate results: Add logging to both header tasks and callbacks
- Use Canvas inspection: Celery provides tools to visualize and inspect workflow execution
- Test header tasks in isolation: Run each task individually with
.apply()before running the full chord
Claude Code can help you add comprehensive logging:
import logging
from celery import chord, group
logger = logging.getLogger(__name__)
@app.task
def monitored_process_item(item):
"""Process item with detailed logging."""
logger.info(f"Starting processing for item: {item}")
result = item * 2
logger.info(f"Completed item {item}, result: {result}")
return {'item': item, 'result': result}
@app.task
def monitored_aggregate(results):
"""Aggregate with logging."""
logger.info(f"Aggregating {len(results)} results")
total = sum(r['result'] for r in results)
logger.info(f"Aggregation complete: {total}")
return {'total': total}
A common silent failure mode: the chord callback never fires even though all header tasks complete. This usually means the result backend is misconfigured or result_extended is not set. To diagnose it quickly, run the header group without a callback first and check that results land in the backend:
Diagnostic: run just the group, no chord
g = group(process_item.s(i) for i in [1, 2, 3])
result = g.apply_async()
print(result.get(timeout=10)) # Should print list of dicts
If this returns results correctly but your chord callback never fires, the issue is in chord backend configuration, not your task logic.
Writing Tests for Chord Workflows
Testing async workflows requires a synchronous test mode. Use CELERY_TASK_ALWAYS_EAGER (Celery 4) or the task_always_eager config (Celery 5) to execute tasks inline during tests:
tests/test_workflows.py
import pytest
from celery import current_app
from workflows import run_workflow
@pytest.fixture(autouse=True)
def celery_eager(settings):
"""Force tasks to run synchronously in tests."""
current_app.conf.task_always_eager = True
current_app.conf.task_eager_propagates = True
yield
current_app.conf.task_always_eager = False
def test_chord_basic():
result = run_workflow([1, 2, 3, 4, 5])
assert result.get()['total'] == 30 # 2+4+6+8+10
assert result.get()['count'] == 5
def test_chord_empty_input():
result = run_workflow([])
assert result.get()['total'] == 0
assert result.get()['count'] == 0
Ask Claude Code: “Write pytest fixtures and test cases for this Celery chord, covering empty input, partial failure, and full success.” It will produce a complete test file including mocks for external services.
Best Practices for Production
When deploying chord workflows to production, consider these recommendations:
- Set appropriate timeouts: Chords can hang if tasks never complete. Use
soft_time_limitandtime_limiton header tasks - Implement proper error handling: Use retry policies and dead letter queues for failed tasks; set
acks_late = Trueon tasks that must not be lost - Monitor task state: Use Flower or similar tools to visualize chord execution; Celery’s
inspectAPI can dump active, reserved, and scheduled tasks at runtime - Test thoroughly: Mock external dependencies and test failure scenarios; always test with an empty header group
| Setting | Recommended Value | Why |
|---|---|---|
chord_propagates |
True |
Callback receives exception info on failure |
result_extended |
True |
Required for correct chord tracking in Celery 5 |
task_acks_late |
True (long tasks) |
Prevents loss if worker crashes mid-task |
task_soft_time_limit |
60–300s | Raises SoftTimeLimitExceeded for graceful cleanup |
task_time_limit |
120–600s | Hard kill; prevents zombie workers |
task_always_eager |
False in prod |
Ensure async execution; True in tests only |
Conclusion
Celery chord workflows enable powerful parallel processing patterns in Python applications. By combining Celery’s built-in coordination with Claude Code’s ability to generate, explain, and debug code, you can build solid asynchronous workflows more efficiently. Start with simple chords, add error handling incrementally, and use Claude Code as your pair programmer for complex implementations.
The key to success is understanding that chords are just groups with callbacks, once you grasp this fundamental concept, building complex pipelines becomes straightforward. Claude Code can help you at every step, from initial scaffolding to production debugging. When you get stuck, paste the full chord definition and the error traceback into Claude and ask it to explain the execution flow step by step. It will trace the data path from header task return values through the backend to the callback invocation, which is usually enough to pinpoint the root cause.
Try it: Paste your error into our Error Diagnostic for an instant fix.
Last verified: April 2026. If this approach no longer works, check Claude Code for Workspace Indexing Workflow Tutorial for updated steps.
Related Reading
- Claude Code for Bruno API Client Workflow Tutorial
- Claude Code for Bubble No-Code Workflow Guide
- Claude Code for Mise Tasks Workflow Tutorial
Built by theluckystrike. More at zovo.one
Find the right skill → Browse 155+ skills in our Skill Finder.