annotating-task-lineage

작성자: astronomer

Airflow 태스크에 인렛과 아웃렛을 사용하여 데이터 계보를 주석 처리합니다. 입력 및 출력을 데이터베이스, 데이터 웨어하우스, 클라우드 스토리지 전반에 걸쳐 정의하기 위해 OpenLineage Dataset 객체, Airflow Assets 및 Airflow Datasets를 지원합니다. 운영자에 내장된 OpenLineage 추출기가 없는 경우 대체 수단으로 사용되며, 사용자 정의 추출기와 OpenLineage 메서드가 우선 적용되는 4단계 우선순위 시스템을 따릅니다. Snowflake, BigQuery, S3 및 PostgreSQL에 대한 일관된 명명을 보장하는 데이터셋 명명 헬퍼를 포함합니다.

npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage

Annotating Task Lineage with Inlets & Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.

Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.

On Astro

Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced Lineage tab, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.

When to Use This Approach

ScenarioUse Inlets/Outlets?
Operator has OpenLineage methods (get_openlineage_facets_on_*)❌ Modify the OL method directly
Operator has no built-in OpenLineage extractor✅ Yes
Simple table-level lineage is sufficient✅ Yes
Quick lineage setup without custom code✅ Yes
Need column-level lineage❌ Use OpenLineage methods or custom extractor
Complex extraction logic needed❌ Use OpenLineage methods or custom extractor

Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.


Supported Types for Inlets/Outlets

You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:

OpenLineage Datasets (Recommended)

from openlineage.client.event_v2 import Dataset

# Database tables
source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)

# Files
input_file = Dataset(
    namespace="s3://my-bucket",
    name="raw/events/2024-01-01.json",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Basic Usage

Setting Inlets and Outlets on Operators

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

# Define your lineage datasets
source_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="raw.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)
output_file = Dataset(
    namespace="s3://my-bucket",
    name="exports/orders.parquet",
)

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],           # What this task reads
        outlets=[target_table],          # What this task writes
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],           # Reads from previous output
        outlets=[output_file],           # Writes to S3
    )

    transform >> export

Multiple Inputs and Outputs

Tasks often read from multiple sources and write to multiple destinations:

from openlineage.client.event_v2 import Dataset

# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],      # All inputs
    outlets=[daily_summary, customer_metrics], # All outputs
)

Setting Lineage in Custom Operators

When building custom operators, you have two options:

Option 1: Implement OpenLineage Methods (Recommended)

This is the preferred approach as it gives you full control over lineage extraction:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

For simpler cases, set lineage within the execute method (non-deferrable operators only):

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # Set lineage dynamically based on operator parameters
        self.inlets = [
            Dataset(namespace="warehouse://db", name=self.source_table)
        ]
        self.outlets = [
            Dataset(namespace="warehouse://db", name=self.target_table)
        ]

        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

Dataset Naming Helpers

Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:

from openlineage.client.event_v2 import Dataset

# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming

naming = SnowflakeDatasetNaming(
    account_identifier="myorg-myaccount",
    database="mydb",
    schema="myschema",
    table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"

# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming

naming = BigQueryDatasetNaming(
    project="my-project",
    dataset="my_dataset",
    table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"

# S3
from openlineage.client.naming.s3 import S3DatasetNaming

naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"

# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming

naming = PostgresDatasetNaming(
    host="localhost",
    port=5432,
    database="mydb",
    schema="public",
    table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"

Note: Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the OpenLineage repo or request it.


Precedence Rules

OpenLineage uses this precedence for lineage extraction:

  1. Custom Extractors (highest) - User-registered extractors
  2. OpenLineage Methods - get_openlineage_facets_on_* in operator
  3. Hook-Level Lineage - Lineage collected from hooks via HookLineageCollector
  4. Inlets/Outlets (lowest) - Falls back to these if nothing else extracts lineage

Note: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.


Best Practices

Use the Naming Helpers

Always use OpenLineage naming helpers for consistent dataset creation:

from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming


def snowflake_dataset(schema: str, table: str) -> Dataset:
    """Create a Snowflake Dataset using the naming helper."""
    naming = SnowflakeDatasetNaming(
        account_identifier="mycompany",
        database="analytics",
        schema=schema,
        table=table,
    )
    return Dataset(namespace=naming.get_namespace(), name=naming.get_name())


# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")

Document Your Lineage

Add comments explaining the data flow:

transform = SqlOperator(
    task_id="transform_orders",
    sql="...",
    # Lineage: Reads raw orders, joins with customers, writes to staging
    inlets=[
        snowflake_dataset("raw", "orders"),
        snowflake_dataset("raw", "customers"),
    ],
    outlets=[
        snowflake_dataset("staging", "order_details"),
    ],
)

Keep Lineage Accurate

  • Update inlets/outlets when SQL queries change
  • Include all tables referenced in JOINs as inlets
  • Include all tables written to (including temp tables if relevant)
  • Outlet-only and inlet-only annotations are valid. One-sided annotations are encouraged for lineage visibility even without a corresponding inlet or outlet in another DAG.

Limitations

LimitationWorkaround
Table-level only (no column lineage)Use OpenLineage methods or custom extractor
Overridden by extractors/methodsOnly use for operators without extractors
Static at DAG parse timeSet dynamically in execute() or use OL methods
Deferrable operators lose dynamic lineageUse OL methods instead; attributes set in execute() are lost when deferring

Related Skills

  • creating-openlineage-extractors: For column-level lineage or complex extraction
  • tracing-upstream-lineage: Investigate where data comes from
  • tracing-downstream-lineage: Investigate what depends on data

astronomer의 다른 스킬

airflow
astronomer
Apache Airflow DAG, 실행, 작업 및 시스템 구성을 쿼리, 관리 및 문제 해결합니다. DAG 검사, 실행 관리, 작업 로깅, 구성 쿼리 및 직접 REST API 액세스에 걸쳐 30개 이상의 명령을 지원합니다. 지속적인 구성으로 여러 Airflow 인스턴스를 관리하고 로컬 및 Astro 배포를 자동으로 검색합니다. DAG 실행을 동기식(완료 대기) 또는 비동기식으로 트리거하고, 실패를 진단하고, 재시도를 위해 실행을 지우고, 재시도/맵 인덱스 필터링을 통해 작업 로그에 액세스합니다. 출력...
official
airflow-hitl
astronomer
인간 승인 게이트, 폼 입력, 그리고 지연 가능 연산자를 사용한 Airflow DAG 내 분기 처리. 네 가지 연산자 유형: 승인/거부 결정을 위한 ApprovalOperator, 폼을 통한 다중 옵션 선택을 위한 HITLOperator, 인간 주도 작업 라우팅을 위한 HITLBranchOperator, 폼 데이터 수집을 위한 HITLEntryOperator. 모든 연산자는 지연 가능하며, Airflow UI의 Required Actions 탭 또는 REST API를 통해 인간 응답을 기다리는 동안 작업자 슬롯을 해제합니다. 선택적 기능 지원 포함: 사용자 정의...
official
airflow-plugins
astronomer
Airflow 3.1+ 플러그인을 빌드하여 FastAPI 앱, 커스텀 UI 페이지, React 컴포넌트, 미들웨어, 매크로 및 연산자 링크를 Airflow UI에 직접 임베드합니다. 사용…
official
analyzing-data
astronomer
데이터 웨어하우스에 질의하여 캐시된 패턴과 개념 매핑을 통해 비즈니스 질문에 답변합니다. 반복되는 질문 유형에 대한 패턴 조회 및 캐싱을 지원하며, 결과 기록을 통해 향후 질의를 개선합니다. 개념-테이블 매핑 캐시와 INFORMATION_SCHEMA 또는 코드베이스 grep을 통한 테이블 스키마 탐색을 포함합니다. 분석을 위해 Polars 또는 Pandas DataFrame을 반환하는 run_sql() 및 run_sql_pandas() 커널 함수를 제공합니다. 개념, 패턴 및 테이블 캐시를 관리하기 위한 CLI 명령어와 추가 기능을 포함합니다.
official
authoring-dags
astronomer
Apache Airflow DAG 생성을 위한 안내 워크플로우로, 검증 및 테스트 통합을 포함합니다. 구조화된 6단계 접근 방식: 환경 및 기존 패턴 발견, DAG 구조 계획, 모범 사례에 따른 구현, af CLI 명령어로 검증, 사용자 동의 하에 테스트, 수정 반복. 발견을 위한 CLI 명령어(af config connections, af config providers, af dags list)와 검증을 위한 명령어(af dags errors, af dags get, af dags explore)는 DAG에 대한 즉각적인 피드백을 제공합니다...
official
blueprint
astronomer
Pydantic 검증을 통해 재사용 가능한 Airflow 태스크 그룹 템플릿을 정의하고 YAML로 DAG를 구성합니다. blueprint 템플릿을 생성하거나 DAG를 구성할 때 사용합니다.
official
checking-freshness
astronomer
테이블 타임스탬프와 업데이트 패턴을 확인하여 데이터 신선도를 검증하고, 부패 정도를 평가합니다. 일반적인 ETL 명명 패턴(_loaded_at, _updated_at, created_at 등)을 사용하여 타임스탬프 열을 식별하고, 최대값을 조회하여 데이터의 기간을 파악합니다. 데이터 신선도를 네 가지 상태로 분류합니다: 신선(4시간 미만), 부패(4~24시간), 매우 부패(24시간 초과), 또는 알 수 없음(타임스탬프 없음). 최근 며칠간의 마지막 업데이트 시간과 행 수 추세를 확인하기 위한 SQL 템플릿을 제공합니다.
official
cosmos-dbt-core
astronomer
dbt Core 프로젝트를 Astronomer Cosmos를 사용하여 Airflow DAG 또는 TaskGroup으로 변환합니다. 세 가지 어셈블리 패턴을 지원합니다: 독립형 DbtDag, 기존 DAG 내 DbtTaskGroup, 세밀한 제어를 위한 개별 Cosmos 연산자. 격리 및 성능 요구 사항에 따라 8가지 실행 모드(WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC 등) 중에서 선택할 수 있습니다. 속도와 선택기 복잡성의 균형을 맞추기 위해 세 가지 파싱 전략(dbt_manifest, dbt_ls, dbt_ls_file, automatic)을 제공합니다...
official