tracing-upstream-lineage

Lacak lineage data hulu untuk mengidentifikasi sumber, DAG, dan dependensi yang memberi makan tabel atau kolom. Mendukung pelacakan tiga jenis target: tabel, kolom, dan DAG; menggunakan kode sumber DAG Airflow dan inspeksi tugas untuk menemukan pipeline yang memproduksi. Menangani sumber SQL (klausa FROM), sistem eksternal (S3, Postgres, Salesforce, HTTP API), dan sumber berbasis file; secara rekursif melacak rantai hulu. Mencakup pelacakan tingkat kolom melalui pemetaan langsung, transformasi, dan agregasi dalam kode DAG...

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

Lineage Investigation

Step 1: Identify the Target Type

Determine what we're tracing:

  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from

Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

  1. Search DAGs by name: Use af dags list and look for DAG names matching the table name

    • load_customers -> customers table
    • etl_daily_orders -> orders table
  2. Explore DAG source code: Use af dags source <dag_id> to read the DAG definition

    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs

On Astro

If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

SQL Sources (look for FROM clauses):

# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source

External Sources (look for connection references):

  • S3Operator -> S3 bucket source
  • PostgresOperator -> Postgres database source
  • SalesforceOperator -> Salesforce API source
  • HttpOperator -> REST API source

File Sources:

  • CSV/Parquet files in object storage
  • SFTP drops
  • Local file paths

Step 4: Build the Lineage Chain

Recursively trace each source:

TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

For each upstream source:

  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with af dags stats
  • External systems: Note connection info from DAG code

Lineage for Columns

When tracing a specific column:

  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings: source.col AS target_col
    • Transformations: COALESCE(a.col, b.col) AS target_col
    • Aggregations: SUM(detail.amount) AS total_amount

Output: Lineage Report

Summary

One-line answer: "This table is populated by DAG X from sources Y and Z"

Lineage Diagram

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops

Transformation Chain

Describe how data flows and transforms:

  1. Raw data lands in raw.orders via Salesforce API sync
  2. DAG transform_orders cleans and dedupes into stg.orders
  3. DAG build_order_facts joins with dimensions into fct.orders

Data Quality Implications

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?

Related Skills

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill

Lebih banyak skill dari astronomer

airflow
astronomer
Kueri, kelola, dan pecahkan masalah DAG, proses, tugas, serta konfigurasi sistem Apache Airflow. Mendukung 30+ perintah untuk inspeksi DAG, manajemen proses, pencatatan tugas, kueri konfigurasi, dan akses langsung REST API. Kelola beberapa instance Airflow dengan konfigurasi persisten; temukan secara otomatis deployment lokal dan Astro. Jalankan proses DAG secara sinkron (tunggu hingga selesai) atau asinkron, diagnosis kegagalan, hapus proses untuk percobaan ulang, dan akses log tugas dengan filter percobaan ulang/indeks peta. Keluaran...
official
airflow-hitl
astronomer
Gerbang persetujuan manusia, input formulir, dan percabangan dalam DAG Airflow menggunakan operator yang dapat ditunda. Empat jenis operator: ApprovalOperator untuk keputusan setuju/tolak, HITLOperator untuk pemilihan multi-opsi dengan formulir, HITLBranchOperator untuk perutean tugas yang digerakkan manusia, dan HITLEntryOperator untuk pengumpulan data formulir. Semua operator dapat ditunda, membebaskan slot pekerja sambil menunggu respons manusia melalui tab Required Actions di UI Airflow atau REST API. Mendukung fitur opsional termasuk kustom...
official
airflow-plugins
astronomer
Bangun plugin Airflow 3.1+ yang menyematkan aplikasi FastAPI, halaman UI kustom, komponen React, middleware, makro, dan tautan operator langsung ke dalam UI Airflow. Gunakan…
official
analyzing-data
astronomer
Kueri gudang data Anda untuk menjawab pertanyaan bisnis dengan pola yang di-cache dan pemetaan konsep. Mendukung pencarian pola dan caching untuk jenis pertanyaan berulang, dengan pencatatan hasil untuk meningkatkan kueri di masa mendatang. Menyertakan cache pemetaan konsep-ke-tabel dan penemuan skema tabel melalui INFORMATION_SCHEMA atau grep basis kode. Menyediakan fungsi kernel run_sql() dan run_sql_pandas() yang mengembalikan DataFrame Polars atau Pandas untuk analisis. Perintah CLI untuk mengelola cache konsep, pola, dan tabel, plus...
official
annotating-task-lineage
astronomer
Anotasi tugas Airflow dengan lineage data menggunakan inlet dan outlet. Mendukung objek Dataset OpenLineage, Aset Airflow, dan Dataset Airflow untuk mendefinisikan input dan output di seluruh basis data, gudang data, dan penyimpanan cloud. Digunakan sebagai cadangan ketika operator tidak memiliki ekstraktor OpenLineage bawaan; mengikuti sistem prioritas empat tingkat di mana ekstraktor kustom dan metode OpenLineage diutamakan. Menyertakan pembantu penamaan dataset untuk Snowflake, BigQuery, S3, dan PostgreSQL guna memastikan konsistensi...
official
authoring-dags
astronomer
Panduan kerja untuk membuat DAG Apache Airflow dengan integrasi validasi dan pengujian. Pendekatan enam fase terstruktur: temukan lingkungan dan pola yang ada, rencanakan struktur DAG, implementasikan sesuai praktik terbaik, validasi dengan perintah CLI af, uji dengan persetujuan pengguna, dan lakukan iterasi perbaikan. Perintah CLI untuk penemuan (af config connections, af config providers, af dags list) dan validasi (af dags errors, af dags get, af dags explore) memberikan umpan balik langsung pada DAG...
official
blueprint
astronomer
Definisikan templat grup tugas Airflow yang dapat digunakan kembali dengan validasi Pydantic dan susun DAG dari YAML. Gunakan saat membuat templat blueprint, menyusun DAG dari…
official
checking-freshness
astronomer
Verifikasi kesegaran data dengan memeriksa timestamp tabel dan pola pembaruan terhadap skala ketidaksegaran. Mengidentifikasi kolom timestamp menggunakan pola penamaan ETL umum (_loaded_at, _updated_at, created_at, dll.) dan menanyakan nilai maksimumnya untuk menentukan usia. Mengklasifikasikan data ke dalam empat status kesegaran: Segar (< 4 jam), Agak Basi (4–24 jam), Sangat Basi (> 24 jam), atau Tidak Diketahui (tidak ada timestamp ditemukan). Menyediakan template SQL untuk memeriksa waktu pembaruan terakhir dan tren jumlah baris selama beberapa hari terakhir hingga...
official