annotating-task-lineage

Annotieren von Airflow-Tasks mit Data Lineage mithilfe von Inlets und Outlets. Unterstützt OpenLineage-Dataset-Objekte, Airflow-Assets und Airflow-Datasets zur Definition von Ein- und Ausgaben über Datenbanken, Data Warehouses und Cloud-Speicher hinweg. Verwenden Sie es als Fallback, wenn Operatoren keine integrierten OpenLineage-Extraktoren besitzen; folgt einem vierstufigen Prioritätssystem, bei dem benutzerdefinierte Extraktoren und OpenLineage-Methoden Vorrang haben. Enthält Dataset-Namenshilfen für Snowflake, BigQuery, S3 und PostgreSQL, um eine konsistente...

npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage

Annotating Task Lineage with Inlets & Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.

Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.

On Astro

Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced Lineage tab, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.

When to Use This Approach

ScenarioUse Inlets/Outlets?
Operator has OpenLineage methods (get_openlineage_facets_on_*)❌ Modify the OL method directly
Operator has no built-in OpenLineage extractor✅ Yes
Simple table-level lineage is sufficient✅ Yes
Quick lineage setup without custom code✅ Yes
Need column-level lineage❌ Use OpenLineage methods or custom extractor
Complex extraction logic needed❌ Use OpenLineage methods or custom extractor

Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.


Supported Types for Inlets/Outlets

You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:

OpenLineage Datasets (Recommended)

from openlineage.client.event_v2 import Dataset

# Database tables
source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)

# Files
input_file = Dataset(
    namespace="s3://my-bucket",
    name="raw/events/2024-01-01.json",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Basic Usage

Setting Inlets and Outlets on Operators

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

# Define your lineage datasets
source_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="raw.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)
output_file = Dataset(
    namespace="s3://my-bucket",
    name="exports/orders.parquet",
)

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],           # What this task reads
        outlets=[target_table],          # What this task writes
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],           # Reads from previous output
        outlets=[output_file],           # Writes to S3
    )

    transform >> export

Multiple Inputs and Outputs

Tasks often read from multiple sources and write to multiple destinations:

from openlineage.client.event_v2 import Dataset

# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],      # All inputs
    outlets=[daily_summary, customer_metrics], # All outputs
)

Setting Lineage in Custom Operators

When building custom operators, you have two options:

Option 1: Implement OpenLineage Methods (Recommended)

This is the preferred approach as it gives you full control over lineage extraction:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

For simpler cases, set lineage within the execute method (non-deferrable operators only):

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # Set lineage dynamically based on operator parameters
        self.inlets = [
            Dataset(namespace="warehouse://db", name=self.source_table)
        ]
        self.outlets = [
            Dataset(namespace="warehouse://db", name=self.target_table)
        ]

        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

Dataset Naming Helpers

Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:

from openlineage.client.event_v2 import Dataset

# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming

naming = SnowflakeDatasetNaming(
    account_identifier="myorg-myaccount",
    database="mydb",
    schema="myschema",
    table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"

# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming

naming = BigQueryDatasetNaming(
    project="my-project",
    dataset="my_dataset",
    table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"

# S3
from openlineage.client.naming.s3 import S3DatasetNaming

naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"

# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming

naming = PostgresDatasetNaming(
    host="localhost",
    port=5432,
    database="mydb",
    schema="public",
    table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"

Note: Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the OpenLineage repo or request it.


Precedence Rules

OpenLineage uses this precedence for lineage extraction:

  1. Custom Extractors (highest) - User-registered extractors
  2. OpenLineage Methods - get_openlineage_facets_on_* in operator
  3. Hook-Level Lineage - Lineage collected from hooks via HookLineageCollector
  4. Inlets/Outlets (lowest) - Falls back to these if nothing else extracts lineage

Note: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.


Best Practices

Use the Naming Helpers

Always use OpenLineage naming helpers for consistent dataset creation:

from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming


def snowflake_dataset(schema: str, table: str) -> Dataset:
    """Create a Snowflake Dataset using the naming helper."""
    naming = SnowflakeDatasetNaming(
        account_identifier="mycompany",
        database="analytics",
        schema=schema,
        table=table,
    )
    return Dataset(namespace=naming.get_namespace(), name=naming.get_name())


# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")

Document Your Lineage

Add comments explaining the data flow:

transform = SqlOperator(
    task_id="transform_orders",
    sql="...",
    # Lineage: Reads raw orders, joins with customers, writes to staging
    inlets=[
        snowflake_dataset("raw", "orders"),
        snowflake_dataset("raw", "customers"),
    ],
    outlets=[
        snowflake_dataset("staging", "order_details"),
    ],
)

Keep Lineage Accurate

  • Update inlets/outlets when SQL queries change
  • Include all tables referenced in JOINs as inlets
  • Include all tables written to (including temp tables if relevant)
  • Outlet-only and inlet-only annotations are valid. One-sided annotations are encouraged for lineage visibility even without a corresponding inlet or outlet in another DAG.

Limitations

LimitationWorkaround
Table-level only (no column lineage)Use OpenLineage methods or custom extractor
Overridden by extractors/methodsOnly use for operators without extractors
Static at DAG parse timeSet dynamically in execute() or use OL methods
Deferrable operators lose dynamic lineageUse OL methods instead; attributes set in execute() are lost when deferring

Related Skills

  • creating-openlineage-extractors: For column-level lineage or complex extraction
  • tracing-upstream-lineage: Investigate where data comes from
  • tracing-downstream-lineage: Investigate what depends on data

Mehr Skills von astronomer

airflow
astronomer
Apache Airflow-DAGs, Ausführungen, Aufgaben und Systemkonfiguration abfragen, verwalten und Fehler beheben. Unterstützt über 30 Befehle für DAG-Inspektion, Ausführungsverwaltung, Aufgabenprotokollierung, Konfigurationsabfragen und direkten REST-API-Zugriff. Mehrere Airflow-Instanzen mit persistenter Konfiguration verwalten; lokale und Astro-Bereitstellungen automatisch erkennen. DAG-Ausführungen synchron (warten auf Abschluss) oder asynchron auslösen, Fehler diagnostizieren, Ausführungen für Wiederholungen löschen und Aufgabenprotokolle mit Wiederholungs-/Kartenindex-Filterung abrufen. Ausgabe...
official
airflow-hitl
astronomer
Menschliche Genehmigungstore, Formulareingaben und Verzweigungen in Airflow-DAGs unter Verwendung von aufschiebbaren Operatoren. Vier Operatortypen: ApprovalOperator für Genehmigen/Ablehnen-Entscheidungen, HITLOperator für Mehrfachauswahl mit Formularen, HITLBranchOperator für menschlich gesteuerte Aufgabenweiterleitung und HITLEntryOperator für Formulardatenerfassung. Alle Operatoren sind aufschiebbar und geben Worker-Slots frei, während sie auf menschliche Antworten über den Bereich "Erforderliche Aktionen" der Airflow-Benutzeroberfläche oder die REST-API warten. Unterstützt optionale Funktionen einschließlich benutzerdefinierter...
official
airflow-plugins
astronomer
Erstellen Sie Airflow 3.1+-Plugins, die FastAPI-Apps, benutzerdefinierte UI-Seiten, React-Komponenten, Middleware, Makros und Operator-Links direkt in die Airflow-Oberfläche einbetten. Verwenden Sie…
official
analyzing-data
astronomer
Fragen Sie Ihr Data Warehouse, um Geschäftsfragen mit zwischengespeicherten Mustern und Konzeptzuordnungen zu beantworten. Unterstützt Mustersuche und Zwischenspeicherung für wiederkehrende Fragetypen, mit Aufzeichnung der Ergebnisse zur Verbesserung zukünftiger Abfragen. Enthält eine Konzept-zu-Tabelle-Zuordnungs-Cache und Tabellenschema-Erkennung über INFORMATION_SCHEMA oder Codebase-Grep. Bietet run_sql()- und run_sql_pandas()-Kernel-Funktionen, die Polars- oder Pandas-DataFrames für Analysen zurückgeben. CLI-Befehle zur Verwaltung von Konzept-, Muster- und Tabellen-Caches, plus...
official
authoring-dags
astronomer
Geführter Workflow zur Erstellung von Apache Airflow DAGs mit Validierungs- und Testintegration. Strukturierter Sechs-Phasen-Ansatz: Umgebung und bestehende Muster erkunden, DAG-Struktur planen, Implementierung nach Best Practices, Validierung mit af CLI-Befehlen, Testen mit Benutzereinwilligung und Iteration über Fehlerbehebungen. CLI-Befehle zur Erkundung (af config connections, af config providers, af dags list) und Validierung (af dags errors, af dags get, af dags explore) bieten sofortiges Feedback zu DAG...
official
blueprint
astronomer
Wiederverwendbare Airflow-Task-Gruppen-Vorlagen mit Pydantic-Validierung definieren und DAGs aus YAML zusammenstellen. Verwenden beim Erstellen von Blueprint-Vorlagen, Zusammenstellen von DAGs aus…
official
checking-freshness
astronomer
Überprüft die Datenaktualität durch Abgleich von Tabellenzeitstempeln und Aktualisierungsmustern mit einer Veraltungsskala. Identifiziert Zeitstempelspalten anhand gängiger ETL-Benennungsmuster (_loaded_at, _updated_at, created_at usw.) und fragt deren Maximalwerte ab, um das Alter zu bestimmen. Klassifiziert Daten in vier Aktualitätsstatus: Frisch (< 4 Stunden), Veraltet (4–24 Stunden), Stark veraltet (> 24 Stunden) oder Unbekannt (kein Zeitstempel gefunden). Stellt SQL-Vorlagen zur Überprüfung der letzten Aktualisierungszeit und der Zeilenanzahltrends der letzten Tage bereit, um...
official
cosmos-dbt-core
astronomer
Konvertieren Sie dbt Core-Projekte mit Astronomer Cosmos in Airflow-DAGs oder TaskGroups. Unterstützt drei Assembly-Muster: eigenständigen DbtDag, DbtTaskGroup innerhalb bestehender DAGs und einzelne Cosmos-Operatoren für feingranulare Steuerung. Wählen Sie aus acht Ausführungsmodi (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC und andere) basierend auf Isolations- und Leistungsanforderungen. Bietet drei Parsing-Strategien (dbt_manifest, dbt_ls, dbt_ls_file, automatisch), um Geschwindigkeit und Selektorkomplexität auszugleichen...
official