Claude Code for OpenLineage Workflow (2026)
Claude Code for OpenLineage Workflow Tutorial Guide
OpenLineage has become the de facto standard for metadata collection in modern data ecosystems. This comprehensive guide shows you how to use Claude Code to accelerate your OpenLineage workflow implementation, from initial setup to advanced integrations.
What is OpenLineage?
OpenLineage is an open-source framework that provides uniform metadata collection across your entire data stack. It enables data teams to track data lineage from source to destination, understand data flows, and maintain compliance. Whether you’re running Spark jobs, Airflow DAGs, or dbt transformations, OpenLineage captures the relationships between datasets, jobs, and runs.
The core model is straightforward: every job execution emits events that describe what data was read, what was written, and metadata about the run itself. These events flow to a backend (Marquez is the reference implementation) where lineage graphs are assembled and made queryable. The standard is governed by the OpenLineage project under the Linux Foundation, which means the spec is stable and growing in adoption.
The OpenLineage Data Model
Before writing any code, it helps to understand the three entities the spec tracks:
| Entity | What it represents | Example |
|---|---|---|
| Job | A named unit of work | etl.sales_daily_aggregation |
| Run | One execution of a job | Run ID abc-123, started 2026-03-20 02:00 UTC |
| Dataset | A named data asset | s3://datalake/raw/sales/ |
Events tie these together. A START event says “run abc-123 of job etl.sales_daily_aggregation began.” A COMPLETE event says “that run finished and produced dataset X from dataset Y.” This is everything you need to reconstruct full data lineage after the fact.
Why Use Claude Code with OpenLineage?
Claude Code brings AI-assisted development to your OpenLineage workflow. It can help you:
- Generate OpenLineage integration code quickly
- Debug lineage collection issues
- Create custom emitters and receivers
- Write tests for your OpenLineage implementations
- Document your lineage workflows
But there are more specific productivity gains worth calling out. OpenLineage integrations involve a fair amount of boilerplate, run IDs, timestamps, facet construction, error handling around emission failures. Claude Code handles that boilerplate reliably so you can focus on the semantics of your specific pipeline. When you describe your pipeline shape (sources, transformations, sinks), Claude generates the structural scaffolding immediately.
Setting Up Your Environment
Before diving into OpenLineage, ensure you have Claude Code installed and your development environment configured:
Install Claude Code
npm install -g @anthropic-ai/claude-code
Verify installation
claude --version
Initialize a new project with OpenLineage
mkdir my-openlineage-project && cd my-openlineage-project
pip install openlineage-python openlineage-airflow
For projects using Marquez as the backend, you can spin up a local instance with Docker to test against before pointing at your production lineage server:
Start Marquez locally
docker run --name marquez \
-e MARQUEZ_CONFIG='{"db": {"url": "jdbc:postgresql://localhost:5432/marquez"}}' \
-p 5000:5000 \
marquezproject/marquez:latest
Creating Your First OpenLineage Integration
Let’s build a basic OpenLineage integration using Claude Code. Start by describing your workflow to Claude:
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job,
InputDataset, OutputDataset
)
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
SourceCodeLocationJobFacet
)
import uuid
from datetime import datetime, timezone
Initialize the client pointing at your Marquez instance
client = OpenLineageClient(url="http://localhost:5000")
run_id = str(uuid.uuid4())
namespace = "my-data-namespace"
job_name = "etl_pipeline"
Emit the START event
client.emit(
RunEvent(
eventType=RunState.START,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=Job(namespace=namespace, name=job_name),
producer="https://github.com/yourorg/yourrepo",
inputs=[],
outputs=[]
)
)
... your actual ETL work here ...
Emit the COMPLETE event with input/output datasets
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=Job(namespace=namespace, name=job_name),
producer="https://github.com/yourorg/yourrepo",
inputs=[
InputDataset(
namespace="s3://datalake",
name="raw/sales/data"
)
],
outputs=[
OutputDataset(
namespace="s3://datalake",
name="processed/sales/summary"
)
]
)
)
Claude Code can help you extend this basic pattern to handle complex workflows with multiple tasks and dependencies. The pattern you want to establish early is: one run_id per logical job execution, emitted at START, then re-used on COMPLETE or FAIL. This is the most common mistake in new integrations, generating a new run ID at completion time, which breaks lineage graph assembly in Marquez.
Integrating with Apache Airflow
Airflow is one of the most common use cases for OpenLineage. Here’s how to set up the integration:
airflow_dag.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from openlineage.airflow.extractors import TaskMetadata
from openlineage.airflow.adapter import OpenLineageAdapter
openlineage_adapter = OpenLineageAdapter(
config={
"namespace": "airflow",
"url": "http://localhost:5000"
}
)
def extract(context):
"""Extract data from source"""
data = [1, 2, 3, 4, 5]
context['task_instance'].xcom_push(key='extracted_data', value=data)
return data
def transform(context):
"""Transform the extracted data"""
data = context['task_instance'].xcom_pull(key='extracted_data')
transformed = [x * 2 for x in data]
return transformed
def load(context):
"""Load transformed data"""
data = context['task_instance'].xcom_pull(key='return_value')
print(f"Loaded: {data}")
with DAG(
dag_id="etl_pipeline",
start_date=datetime(2026, 1, 1),
schedule_interval="@daily"
) as dag:
extract_task = PythonOperator(
task_id="extract",
python_callable=extract
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform
)
load_task = PythonOperator(
task_id="load",
python_callable=load
)
extract_task >> transform_task >> load_task
The Airflow integration uses listeners and extractors. Listeners hook into task lifecycle events (start, success, failure) to emit RunEvents automatically. Extractors are operator-specific classes that know how to read an operator’s configuration and produce the right InputDataset or OutputDataset facets.
For built-in operators (BigQueryOperator, S3FileTransformOperator, etc.), extractors exist already. For custom PythonOperator tasks, you write a custom extractor or emit events manually as shown above.
Configuring the OpenLineage Airflow Provider
In your airflow.cfg or environment variables, set:
OPENLINEAGE_URL=http://your-marquez-host:5000
OPENLINEAGE_NAMESPACE=your-airflow-namespace
The provider picks these up automatically. Once configured, every operator that has an extractor starts emitting lineage without changes to your DAG code.
Custom Lineage Events for Complex Workflows
For workflows beyond standard integrations, you can emit custom lineage events with rich schema facets:
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
SourceCodeLocationJobFacet,
DocumentationJobFacet
)
Build schema facets for input and output datasets
sales_input_schema = SchemaDatasetFacet(
fields=[
SchemaField(name="id", type="STRING"),
SchemaField(name="amount", type="DECIMAL"),
SchemaField(name="transaction_date", type="DATE"),
SchemaField(name="customer_id", type="STRING")
]
)
summary_output_schema = SchemaDatasetFacet(
fields=[
SchemaField(name="date", type="DATE"),
SchemaField(name="total", type="DECIMAL"),
SchemaField(name="transaction_count", type="INTEGER")
]
)
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=Job(
namespace=namespace,
name="custom_etl_job",
facets={
"sourceCodeLocation": SourceCodeLocationJobFacet(
type="git",
url="https://github.com/yourorg/yourrepo",
branch="main",
path="pipelines/sales_aggregation.py"
),
"documentation": DocumentationJobFacet(
description="Daily sales aggregation job. Reads raw transactions, "
"aggregates by date, writes to processed layer."
)
}
),
producer="https://github.com/yourorg/yourrepo",
inputs=[
InputDataset(
namespace="s3://datalake",
name="raw/sales/data",
facets={"schema": sales_input_schema}
)
],
outputs=[
OutputDataset(
namespace="s3://datalake",
name="processed/sales/summary",
facets={"schema": summary_output_schema}
)
]
)
)
Schema facets are particularly valuable. When Marquez stores schema alongside lineage, you can detect breaking schema changes by comparing facets across runs. A column that existed in run 1 but not run 2 is a signal worth surfacing in your data quality pipeline.
Using dbt with OpenLineage
dbt is another high-value integration target. The dbt-ol package wraps dbt commands and emits OpenLineage events for each model run:
pip install openlineage-dbt
Run dbt with lineage emission
dbt-ol run --profiles-dir .
Under the hood, dbt-ol reads the dbt manifest to understand model dependencies and maps them to OpenLineage job and dataset concepts. Each model becomes a job; each ref() becomes an input dataset; each model output becomes an output dataset. The result is that your full dbt DAG is visible in Marquez alongside your Airflow DAGs and any custom pipelines, giving you one unified lineage graph across your stack.
Ask Claude to help you configure dbt-ol for a project:
Claude, I have a dbt project with profiles stored in ~/.dbt/profiles.yml.
The target warehouse is BigQuery. How do I configure dbt-ol to emit events
to my Marquez instance at http://marquez.internal:5000?
Claude will produce the correct environment variable configuration and explain how to handle service account credentials for the BigQuery connection.
Debugging OpenLineage Issues with Claude Code
When your lineage collection isn’t working as expected, Claude Code can help diagnose common issues:
- Connection problems: Check your client configuration and ensure the OpenLineage backend is reachable. Use
curl http://your-marquez-host:5000/api/v1/namespacesto verify the Marquez API responds before blaming the client library. - Missing events: Verify that your operators are properly decorated or extended. For Airflow, check that the OpenLineage provider is installed and that
OPENLINEAGE_URLis set in the environment where the scheduler and workers run. - Incomplete metadata: Ensure all required facets are being populated. A run that emits START but never emits COMPLETE or FAIL will appear in Marquez as a perpetually running job.
- Duplicate run IDs: If your run IDs are not globally unique (e.g., you use a pipeline name instead of a UUID), Marquez merges events with the same run ID, which can corrupt lineage graphs.
Ask Claude to review your integration code and suggest improvements:
Claude, can you review this OpenLineage integration and identify why
the input datasets aren't being captured in the run events?
Paste the relevant code and Claude will walk through the event structure, check that InputDataset objects are being added to the inputs list (not inputs_facets), and verify that the run_id from the START event is being reused on COMPLETE.
Writing Tests for Your OpenLineage Integrations
Untested lineage code tends to silently emit wrong or incomplete events for months before anyone notices. Write tests that verify the structure of emitted events rather than just checking that no exception was raised.
import unittest
from unittest.mock import MagicMock, patch
from openlineage.client.run import RunEvent, RunState
class TestSalesPipelineLineage(unittest.TestCase):
@patch("openlineage.client.OpenLineageClient.emit")
def test_emits_start_and_complete(self, mock_emit):
from my_pipeline import run_sales_aggregation
run_sales_aggregation(date="2026-03-20")
self.assertEqual(mock_emit.call_count, 2)
start_event = mock_emit.call_args_list[0][0][0]
complete_event = mock_emit.call_args_list[1][0][0]
self.assertIsInstance(start_event, RunEvent)
self.assertEqual(start_event.eventType, RunState.START)
self.assertEqual(complete_event.eventType, RunState.COMPLETE)
@patch("openlineage.client.OpenLineageClient.emit")
def test_complete_event_has_output_datasets(self, mock_emit):
from my_pipeline import run_sales_aggregation
run_sales_aggregation(date="2026-03-20")
complete_event = mock_emit.call_args_list[1][0][0]
self.assertEqual(len(complete_event.outputs), 1)
self.assertEqual(complete_event.outputs[0].name, "processed/sales/summary")
@patch("openlineage.client.OpenLineageClient.emit")
def test_run_ids_match_between_events(self, mock_emit):
from my_pipeline import run_sales_aggregation
run_sales_aggregation(date="2026-03-20")
start_run_id = mock_emit.call_args_list[0][0][0].run.runId
complete_run_id = mock_emit.call_args_list[1][0][0].run.runId
self.assertEqual(start_run_id, complete_run_id)
That last test, verifying that run IDs match, catches the most common lineage bug in production integrations.
Best Practices for OpenLineage Workflows
Follow these recommendations for solid lineage tracking:
- Use consistent namespaces across all your data sources. A dataset called
s3://datalake/raw/salesemitted by Airflow and the same dataset emitted by a Spark job need identical namespace and name strings or Marquez treats them as different datasets. - Include schema facets to capture data structure changes over time.
- Add documentation facets for business context. Future analysts will thank you.
- Emit events at appropriate granularity: one job per logical pipeline stage, not one job per row processed.
- Implement error handling to catch and log failed lineage emissions without crashing the pipeline. Lineage emission failures should log a warning, not abort the job.
- Test your integrations with mock runs before production deployment.
- Use UTC timestamps everywhere. Mixed timezone events cause ordering bugs in lineage viewers.
Safe emission pattern: never let lineage failure kill the pipeline
def safe_emit(client, event):
try:
client.emit(event)
except Exception as e:
import logging
logging.warning(f"OpenLineage emission failed: {e}")
Comparing OpenLineage Client Libraries
| Language | Package | Notes |
|---|---|---|
| Python | openlineage-python |
Most mature, used in Airflow and dbt integrations |
| Java/Scala | openlineage-java |
Used by Spark integration |
| JavaScript | @openlineage/client |
Less common, useful for Node.js ETL tools |
| Go | Community packages | No official client as of early 2026 |
For most data engineering teams working in Python, the openlineage-python package is the right starting point. Use Claude Code to generate integration code in this package, it has good coverage in Claude’s training data and produces correct, idiomatic usage.
Conclusion
Integrating OpenLineage into your data workflows doesn’t have to be complex. With Claude Code as your development partner, you can rapidly implement lineage tracking, debug issues, and maintain comprehensive metadata coverage across your data ecosystem.
Start small with basic event emission, then progressively add more sophisticated facets and custom integrations as your lineage requirements grow. The key is to begin collecting metadata early so you can build a complete picture of your data’s journey.
Establish the habit of including schema facets from the start. Adding them to an existing integration after months of production usage means historical runs lack schema information, which limits retroactive impact analysis. It is far easier to include them on day one.
Remember: good lineage is an investment in data governance, debugging, and overall data platform reliability. Let Claude Code help you make that investment efficiently.
Try it: Paste your error into our Error Diagnostic for an instant fix.
Related Reading
- AI Assisted Architecture Design Workflow Guide
- AI Assisted Code Review Workflow Best Practices
- Best Way to Integrate Claude Code into Team Workflow
Built by theluckystrike. More at zovo.one
Get started → Generate your project setup with our Project Starter.