creating-openlineage-extractors

작성자: astronomer

지원되지 않는 Airflow 연산자와 복잡한 계보 시나리오를 위한 맞춤형 OpenLineage 추출기. 두 가지 접근 방식: 소유한 연산자에 직접 OpenLineage 메서드를 추가(권장)하거나, 수정할 수 없는 타사 연산자를 위한 맞춤형 추출기를 생성합니다. 추출기는 세 지점에서 연산자 실행을 가로챕니다: 정적 계보를 위한 실행 전, 런타임에 결정된 출력을 위한 성공 후, 그리고 선택적으로 부분 계보를 위한 실패 후. airflow.cfg 또는 환경을 통해 추출기를 등록합니다...

npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors

Creating OpenLineage Extractors

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.

Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.

When to Use Each Approach

ScenarioApproach
Operator you own/maintainOpenLineage Methods (recommended, simplest)
Third-party operator you can't modifyCustom Extractor
Need column-level lineageOpenLineage Methods or Custom Extractor
Complex extraction logicOpenLineage Methods or Custom Extractor
Simple table-level lineageInlets/Outlets (simplest, but lowest priority)

Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.

On Astro

Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.


Two Approaches

1. OpenLineage Methods (Recommended)

Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.

2. Custom Extractors

Use when you need lineage from third-party or provider operators that you cannot modify.


Approach 1: OpenLineage Methods (Recommended)

When you own the operator, add OpenLineage methods directly:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineage Methods Reference

MethodWhen CalledRequired
get_openlineage_facets_on_start()Task enters RUNNINGNo
get_openlineage_facets_on_complete(ti)Task succeedsNo
get_openlineage_facets_on_failure(ti)Task failsNo

Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.


Approach 2: Custom Extractors

Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).

Basic Structure

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage Structure

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Extraction Methods

MethodWhen CalledUse For
_execute_extraction()Before operator runsStatic/known lineage
extract_on_complete(task_instance)After successRuntime-determined lineage
extract_on_failure(task_instance)After failurePartial lineage on errors

Registering Extractors

Option 1: Configuration file (airflow.cfg)

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Option 2: Environment variable

AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'

Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.


Common Patterns

SQL Operator Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

File Transfer Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

Dynamic Lineage from Execution

from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

Common Pitfalls

1. Circular Imports

Problem: Importing Airflow modules at the top level causes circular imports.

# ❌ BAD - can cause circular import issues
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset

class MyExtractor(BaseExtractor):
    ...
# ✅ GOOD - import inside methods
class MyExtractor(BaseExtractor):
    def _execute_extraction(self):
        from openlineage.client.event_v2 import Dataset
        # ...

2. Wrong Import Path

Problem: Extractor path doesn't match actual module location.

# ❌ Wrong - path doesn't exist
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

# ✅ Correct - full importable path
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'

3. Not Handling None

Problem: Extraction fails when operator properties are None.

# ✅ Handle optional properties
def _execute_extraction(self) -> OperatorLineage | None:
    if not self.operator.source_table:
        return None  # Skip extraction

    return OperatorLineage(...)

Testing Extractors

Unit Testing

import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

Precedence Rules

OpenLineage checks for lineage in this order:

  1. Custom Extractors (highest priority)
  2. OpenLineage Methods on operator
  3. Hook-Level Lineage (from HookLineageCollector)
  4. Inlets/Outlets (lowest priority)

If a custom extractor exists, it overrides built-in extraction and inlets/outlets.


Related Skills

  • annotating-task-lineage: For simple table-level lineage with inlets/outlets
  • tracing-upstream-lineage: Investigate data origins
  • tracing-downstream-lineage: Investigate data dependencies

astronomer의 다른 스킬

airflow
astronomer
Apache Airflow DAG, 실행, 작업 및 시스템 구성을 쿼리, 관리 및 문제 해결합니다. DAG 검사, 실행 관리, 작업 로깅, 구성 쿼리 및 직접 REST API 액세스에 걸쳐 30개 이상의 명령을 지원합니다. 지속적인 구성으로 여러 Airflow 인스턴스를 관리하고 로컬 및 Astro 배포를 자동으로 검색합니다. DAG 실행을 동기식(완료 대기) 또는 비동기식으로 트리거하고, 실패를 진단하고, 재시도를 위해 실행을 지우고, 재시도/맵 인덱스 필터링을 통해 작업 로그에 액세스합니다. 출력...
official
airflow-hitl
astronomer
인간 승인 게이트, 폼 입력, 그리고 지연 가능 연산자를 사용한 Airflow DAG 내 분기 처리. 네 가지 연산자 유형: 승인/거부 결정을 위한 ApprovalOperator, 폼을 통한 다중 옵션 선택을 위한 HITLOperator, 인간 주도 작업 라우팅을 위한 HITLBranchOperator, 폼 데이터 수집을 위한 HITLEntryOperator. 모든 연산자는 지연 가능하며, Airflow UI의 Required Actions 탭 또는 REST API를 통해 인간 응답을 기다리는 동안 작업자 슬롯을 해제합니다. 선택적 기능 지원 포함: 사용자 정의...
official
airflow-plugins
astronomer
Airflow 3.1+ 플러그인을 빌드하여 FastAPI 앱, 커스텀 UI 페이지, React 컴포넌트, 미들웨어, 매크로 및 연산자 링크를 Airflow UI에 직접 임베드합니다. 사용…
official
analyzing-data
astronomer
데이터 웨어하우스에 질의하여 캐시된 패턴과 개념 매핑을 통해 비즈니스 질문에 답변합니다. 반복되는 질문 유형에 대한 패턴 조회 및 캐싱을 지원하며, 결과 기록을 통해 향후 질의를 개선합니다. 개념-테이블 매핑 캐시와 INFORMATION_SCHEMA 또는 코드베이스 grep을 통한 테이블 스키마 탐색을 포함합니다. 분석을 위해 Polars 또는 Pandas DataFrame을 반환하는 run_sql() 및 run_sql_pandas() 커널 함수를 제공합니다. 개념, 패턴 및 테이블 캐시를 관리하기 위한 CLI 명령어와 추가 기능을 포함합니다.
official
annotating-task-lineage
astronomer
Airflow 태스크에 인렛과 아웃렛을 사용하여 데이터 계보를 주석 처리합니다. 입력 및 출력을 데이터베이스, 데이터 웨어하우스, 클라우드 스토리지 전반에 걸쳐 정의하기 위해 OpenLineage Dataset 객체, Airflow Assets 및 Airflow Datasets를 지원합니다. 운영자에 내장된 OpenLineage 추출기가 없는 경우 대체 수단으로 사용되며, 사용자 정의 추출기와 OpenLineage 메서드가 우선 적용되는 4단계 우선순위 시스템을 따릅니다. Snowflake, BigQuery, S3 및 PostgreSQL에 대한 일관된 명명을 보장하는 데이터셋 명명 헬퍼를 포함합니다.
official
authoring-dags
astronomer
Apache Airflow DAG 생성을 위한 안내 워크플로우로, 검증 및 테스트 통합을 포함합니다. 구조화된 6단계 접근 방식: 환경 및 기존 패턴 발견, DAG 구조 계획, 모범 사례에 따른 구현, af CLI 명령어로 검증, 사용자 동의 하에 테스트, 수정 반복. 발견을 위한 CLI 명령어(af config connections, af config providers, af dags list)와 검증을 위한 명령어(af dags errors, af dags get, af dags explore)는 DAG에 대한 즉각적인 피드백을 제공합니다...
official
blueprint
astronomer
Pydantic 검증을 통해 재사용 가능한 Airflow 태스크 그룹 템플릿을 정의하고 YAML로 DAG를 구성합니다. blueprint 템플릿을 생성하거나 DAG를 구성할 때 사용합니다.
official
checking-freshness
astronomer
테이블 타임스탬프와 업데이트 패턴을 확인하여 데이터 신선도를 검증하고, 부패 정도를 평가합니다. 일반적인 ETL 명명 패턴(_loaded_at, _updated_at, created_at 등)을 사용하여 타임스탬프 열을 식별하고, 최대값을 조회하여 데이터의 기간을 파악합니다. 데이터 신선도를 네 가지 상태로 분류합니다: 신선(4시간 미만), 부패(4~24시간), 매우 부패(24시간 초과), 또는 알 수 없음(타임스탬프 없음). 최근 며칠간의 마지막 업데이트 시간과 행 수 추세를 확인하기 위한 SQL 템플릿을 제공합니다.
official