creating-openlineage-extractors

作成者: astronomer

カスタムOpenLineage抽出器。未対応のAirflowオペレーターや複雑な系列シナリオ向け。2つのアプローチ:所有するオペレーターに直接OpenLineageメソッドを追加する方法(推奨)、または変更できないサードパーティ製オペレーター用にカスタム抽出器を作成する方法。抽出器は3つの時点でオペレーターの実行をインターセプトします:静的な系列のための実行前、実行時に決定される出力のための成功後、およびオプションで部分的な系列のための失敗後。抽出器はairflow.cfgまたは環境変数経由で登録...

npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors

Creating OpenLineage Extractors

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.

Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.

When to Use Each Approach

ScenarioApproach
Operator you own/maintainOpenLineage Methods (recommended, simplest)
Third-party operator you can't modifyCustom Extractor
Need column-level lineageOpenLineage Methods or Custom Extractor
Complex extraction logicOpenLineage Methods or Custom Extractor
Simple table-level lineageInlets/Outlets (simplest, but lowest priority)

Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.

On Astro

Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.


Two Approaches

1. OpenLineage Methods (Recommended)

Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.

2. Custom Extractors

Use when you need lineage from third-party or provider operators that you cannot modify.


Approach 1: OpenLineage Methods (Recommended)

When you own the operator, add OpenLineage methods directly:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineage Methods Reference

MethodWhen CalledRequired
get_openlineage_facets_on_start()Task enters RUNNINGNo
get_openlineage_facets_on_complete(ti)Task succeedsNo
get_openlineage_facets_on_failure(ti)Task failsNo

Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.


Approach 2: Custom Extractors

Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).

Basic Structure

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage Structure

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Extraction Methods

MethodWhen CalledUse For
_execute_extraction()Before operator runsStatic/known lineage
extract_on_complete(task_instance)After successRuntime-determined lineage
extract_on_failure(task_instance)After failurePartial lineage on errors

Registering Extractors

Option 1: Configuration file (airflow.cfg)

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Option 2: Environment variable

AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'

Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.


Common Patterns

SQL Operator Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

File Transfer Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

Dynamic Lineage from Execution

from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

Common Pitfalls

1. Circular Imports

Problem: Importing Airflow modules at the top level causes circular imports.

# ❌ BAD - can cause circular import issues
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset

class MyExtractor(BaseExtractor):
    ...
# ✅ GOOD - import inside methods
class MyExtractor(BaseExtractor):
    def _execute_extraction(self):
        from openlineage.client.event_v2 import Dataset
        # ...

2. Wrong Import Path

Problem: Extractor path doesn't match actual module location.

# ❌ Wrong - path doesn't exist
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

# ✅ Correct - full importable path
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'

3. Not Handling None

Problem: Extraction fails when operator properties are None.

# ✅ Handle optional properties
def _execute_extraction(self) -> OperatorLineage | None:
    if not self.operator.source_table:
        return None  # Skip extraction

    return OperatorLineage(...)

Testing Extractors

Unit Testing

import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

Precedence Rules

OpenLineage checks for lineage in this order:

  1. Custom Extractors (highest priority)
  2. OpenLineage Methods on operator
  3. Hook-Level Lineage (from HookLineageCollector)
  4. Inlets/Outlets (lowest priority)

If a custom extractor exists, it overrides built-in extraction and inlets/outlets.


Related Skills

  • annotating-task-lineage: For simple table-level lineage with inlets/outlets
  • tracing-upstream-lineage: Investigate data origins
  • tracing-downstream-lineage: Investigate data dependencies

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official