creating-openlineage-extractors

作者: astronomer

為不支援的Airflow運算子及複雜血緣場景設計的自訂OpenLineage提取器。提供兩種方法:直接在你擁有的運算子中加入OpenLineage方法(建議做法),或為無法修改的第三方運算子建立自訂提取器。提取器在三個時間點攔截運算子執行:執行前取得靜態血緣、成功後取得執行階段決定的輸出、以及選擇性地在失敗後取得部分血緣。可透過airflow.cfg或環境變數註冊提取器...

npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors

Creating OpenLineage Extractors

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.

Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.

When to Use Each Approach

ScenarioApproach
Operator you own/maintainOpenLineage Methods (recommended, simplest)
Third-party operator you can't modifyCustom Extractor
Need column-level lineageOpenLineage Methods or Custom Extractor
Complex extraction logicOpenLineage Methods or Custom Extractor
Simple table-level lineageInlets/Outlets (simplest, but lowest priority)

Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.

On Astro

Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.


Two Approaches

1. OpenLineage Methods (Recommended)

Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.

2. Custom Extractors

Use when you need lineage from third-party or provider operators that you cannot modify.


Approach 1: OpenLineage Methods (Recommended)

When you own the operator, add OpenLineage methods directly:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineage Methods Reference

MethodWhen CalledRequired
get_openlineage_facets_on_start()Task enters RUNNINGNo
get_openlineage_facets_on_complete(ti)Task succeedsNo
get_openlineage_facets_on_failure(ti)Task failsNo

Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.


Approach 2: Custom Extractors

Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).

Basic Structure

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage Structure

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Extraction Methods

MethodWhen CalledUse For
_execute_extraction()Before operator runsStatic/known lineage
extract_on_complete(task_instance)After successRuntime-determined lineage
extract_on_failure(task_instance)After failurePartial lineage on errors

Registering Extractors

Option 1: Configuration file (airflow.cfg)

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Option 2: Environment variable

AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'

Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.


Common Patterns

SQL Operator Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

File Transfer Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

Dynamic Lineage from Execution

from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

Common Pitfalls

1. Circular Imports

Problem: Importing Airflow modules at the top level causes circular imports.

# ❌ BAD - can cause circular import issues
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset

class MyExtractor(BaseExtractor):
    ...
# ✅ GOOD - import inside methods
class MyExtractor(BaseExtractor):
    def _execute_extraction(self):
        from openlineage.client.event_v2 import Dataset
        # ...

2. Wrong Import Path

Problem: Extractor path doesn't match actual module location.

# ❌ Wrong - path doesn't exist
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

# ✅ Correct - full importable path
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'

3. Not Handling None

Problem: Extraction fails when operator properties are None.

# ✅ Handle optional properties
def _execute_extraction(self) -> OperatorLineage | None:
    if not self.operator.source_table:
        return None  # Skip extraction

    return OperatorLineage(...)

Testing Extractors

Unit Testing

import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

Precedence Rules

OpenLineage checks for lineage in this order:

  1. Custom Extractors (highest priority)
  2. OpenLineage Methods on operator
  3. Hook-Level Lineage (from HookLineageCollector)
  4. Inlets/Outlets (lowest priority)

If a custom extractor exists, it overrides built-in extraction and inlets/outlets.


Related Skills

  • annotating-task-lineage: For simple table-level lineage with inlets/outlets
  • tracing-upstream-lineage: Investigate data origins
  • tracing-downstream-lineage: Investigate data dependencies

來自 astronomer 的更多技能

airflow
astronomer
查詢、管理及疑難排解 Apache Airflow 的 DAG、執行、任務與系統設定。支援 30 多種指令,涵蓋 DAG 檢查、執行管理、任務日誌、設定查詢及直接 REST API 存取。可管理多個 Airflow 實例並保留設定;自動探索本機與 Astro 部署。同步(等待完成)或非同步觸發 DAG 執行、診斷失敗、清除執行以重試,並透過重試/映射索引篩選存取任務日誌。輸出...
official
airflow-hitl
astronomer
使用可延遲運算子,在 Airflow DAG 中實現人工審批關卡、表單輸入與分支流程。包含四種運算子類型:ApprovalOperator 用於核准/拒絕決策、HITLOperator 用於多選項表單選擇、HITLBranchOperator 用於人工驅動的任務路由,以及 HITLEntryOperator 用於表單資料收集。所有運算子皆為可延遲,在等待人工回應時釋放工作槽位,可透過 Airflow UI 的「必要操作」標籤或 REST API 進行回應。支援選用功能,包括自訂...
official
airflow-plugins
astronomer
構建 Airflow 3.1+ 插件,將 FastAPI 應用、自訂 UI 頁面、React 元件、中介軟體、巨集和運算子連結直接嵌入 Airflow UI。使用…
official
analyzing-data
astronomer
查詢您的資料倉儲,利用快取的模式與概念映射來回答商業問題。支援針對重複問題類型的模式查詢與快取,並記錄結果以改善未來查詢。包含概念到表格的映射快取,以及透過INFORMATION_SCHEMA或程式碼庫grep進行的表格結構探索。提供run_sql()與run_sql_pandas()核心函式,回傳Polars或Pandas DataFrame供分析使用。CLI指令可管理概念、模式與表格快取,以及...
official
annotating-task-lineage
astronomer
使用 inlets 和 outlets 為 Airflow 任務標註資料血緣。支援 OpenLineage Dataset 物件、Airflow Assets 與 Airflow Datasets,用於定義跨資料庫、資料倉儲及雲端儲存的輸入與輸出。當運算子缺乏內建 OpenLineage 提取器時,可作為備用方案;遵循四層優先級系統,其中自訂提取器與 OpenLineage 方法具有優先權。包含針對 Snowflake、BigQuery、S3 及 PostgreSQL 的資料集命名輔助工具,以確保一致性...
official
authoring-dags
astronomer
建立Apache Airflow DAG的引導式工作流程,包含驗證與測試整合。結構化六階段方法:探索環境與現有模式、規劃DAG結構、遵循最佳實踐進行實作、使用af CLI指令驗證、經使用者同意後測試,以及根據修正反覆迭代。用於探索的CLI指令(af config connections、af config providers、af dags list)與驗證指令(af dags errors、af dags get、af dags explore)可提供DAG的即時回饋。
official
blueprint
astronomer
使用 Pydantic 驗證定義可重複使用的 Airflow 任務組模板,並從 YAML 組合 DAG。適用於建立 blueprint 模板、從 YAML 組合 DAG 等場景。
official
checking-freshness
astronomer
透過檢查表格時間戳記及更新模式,並比對過時程度量表,驗證資料的新鮮度。利用常見的ETL命名模式(如 _loaded_at、_updated_at、created_at 等)識別時間戳記欄位,並查詢其最大值以判斷資料年齡。將資料分類為四種新鮮度狀態:新鮮(少於4小時)、過時(4–24小時)、非常過時(超過24小時)或未知(未找到時間戳記)。提供SQL範本,用於檢查最近幾天的上次更新時間與資料列數量趨勢。
official