tracing-upstream-lineage

Verfolgen Sie die Upstream-Datenherkunft, um Quellen, DAGs und Abhängigkeiten zu identifizieren, die eine Tabelle oder Spalte speisen. Unterstützt die Verfolgung von drei Zieltypen: Tabellen, Spalten und DAGs; nutzt Airflow-DAG-Quellcode und Aufgabeninspektion, um erzeugende Pipelines zu finden. Behandelt SQL-Quellen (FROM-Klauseln), externe Systeme (S3, Postgres, Salesforce, HTTP-APIs) und dateibasierte Quellen; verfolgt rekursiv Upstream-Ketten. Enthält Spaltenebenen-Verfolgung durch direkte Zuordnungen, Transformationen und Aggregationen im DAG-Code...

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

Lineage Investigation

Step 1: Identify the Target Type

Determine what we're tracing:

  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from

Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

  1. Search DAGs by name: Use af dags list and look for DAG names matching the table name

    • load_customers -> customers table
    • etl_daily_orders -> orders table
  2. Explore DAG source code: Use af dags source <dag_id> to read the DAG definition

    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs

On Astro

If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

SQL Sources (look for FROM clauses):

# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source

External Sources (look for connection references):

  • S3Operator -> S3 bucket source
  • PostgresOperator -> Postgres database source
  • SalesforceOperator -> Salesforce API source
  • HttpOperator -> REST API source

File Sources:

  • CSV/Parquet files in object storage
  • SFTP drops
  • Local file paths

Step 4: Build the Lineage Chain

Recursively trace each source:

TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

For each upstream source:

  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with af dags stats
  • External systems: Note connection info from DAG code

Lineage for Columns

When tracing a specific column:

  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings: source.col AS target_col
    • Transformations: COALESCE(a.col, b.col) AS target_col
    • Aggregations: SUM(detail.amount) AS total_amount

Output: Lineage Report

Summary

One-line answer: "This table is populated by DAG X from sources Y and Z"

Lineage Diagram

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops

Transformation Chain

Describe how data flows and transforms:

  1. Raw data lands in raw.orders via Salesforce API sync
  2. DAG transform_orders cleans and dedupes into stg.orders
  3. DAG build_order_facts joins with dimensions into fct.orders

Data Quality Implications

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?

Related Skills

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill

Mehr Skills von astronomer

airflow
astronomer
Apache Airflow-DAGs, Ausführungen, Aufgaben und Systemkonfiguration abfragen, verwalten und Fehler beheben. Unterstützt über 30 Befehle für DAG-Inspektion, Ausführungsverwaltung, Aufgabenprotokollierung, Konfigurationsabfragen und direkten REST-API-Zugriff. Mehrere Airflow-Instanzen mit persistenter Konfiguration verwalten; lokale und Astro-Bereitstellungen automatisch erkennen. DAG-Ausführungen synchron (warten auf Abschluss) oder asynchron auslösen, Fehler diagnostizieren, Ausführungen für Wiederholungen löschen und Aufgabenprotokolle mit Wiederholungs-/Kartenindex-Filterung abrufen. Ausgabe...
official
airflow-hitl
astronomer
Menschliche Genehmigungstore, Formulareingaben und Verzweigungen in Airflow-DAGs unter Verwendung von aufschiebbaren Operatoren. Vier Operatortypen: ApprovalOperator für Genehmigen/Ablehnen-Entscheidungen, HITLOperator für Mehrfachauswahl mit Formularen, HITLBranchOperator für menschlich gesteuerte Aufgabenweiterleitung und HITLEntryOperator für Formulardatenerfassung. Alle Operatoren sind aufschiebbar und geben Worker-Slots frei, während sie auf menschliche Antworten über den Bereich "Erforderliche Aktionen" der Airflow-Benutzeroberfläche oder die REST-API warten. Unterstützt optionale Funktionen einschließlich benutzerdefinierter...
official
airflow-plugins
astronomer
Erstellen Sie Airflow 3.1+-Plugins, die FastAPI-Apps, benutzerdefinierte UI-Seiten, React-Komponenten, Middleware, Makros und Operator-Links direkt in die Airflow-Oberfläche einbetten. Verwenden Sie…
official
analyzing-data
astronomer
Fragen Sie Ihr Data Warehouse, um Geschäftsfragen mit zwischengespeicherten Mustern und Konzeptzuordnungen zu beantworten. Unterstützt Mustersuche und Zwischenspeicherung für wiederkehrende Fragetypen, mit Aufzeichnung der Ergebnisse zur Verbesserung zukünftiger Abfragen. Enthält eine Konzept-zu-Tabelle-Zuordnungs-Cache und Tabellenschema-Erkennung über INFORMATION_SCHEMA oder Codebase-Grep. Bietet run_sql()- und run_sql_pandas()-Kernel-Funktionen, die Polars- oder Pandas-DataFrames für Analysen zurückgeben. CLI-Befehle zur Verwaltung von Konzept-, Muster- und Tabellen-Caches, plus...
official
annotating-task-lineage
astronomer
Annotieren von Airflow-Tasks mit Data Lineage mithilfe von Inlets und Outlets. Unterstützt OpenLineage-Dataset-Objekte, Airflow-Assets und Airflow-Datasets zur Definition von Ein- und Ausgaben über Datenbanken, Data Warehouses und Cloud-Speicher hinweg. Verwenden Sie es als Fallback, wenn Operatoren keine integrierten OpenLineage-Extraktoren besitzen; folgt einem vierstufigen Prioritätssystem, bei dem benutzerdefinierte Extraktoren und OpenLineage-Methoden Vorrang haben. Enthält Dataset-Namenshilfen für Snowflake, BigQuery, S3 und PostgreSQL, um eine konsistente...
official
authoring-dags
astronomer
Geführter Workflow zur Erstellung von Apache Airflow DAGs mit Validierungs- und Testintegration. Strukturierter Sechs-Phasen-Ansatz: Umgebung und bestehende Muster erkunden, DAG-Struktur planen, Implementierung nach Best Practices, Validierung mit af CLI-Befehlen, Testen mit Benutzereinwilligung und Iteration über Fehlerbehebungen. CLI-Befehle zur Erkundung (af config connections, af config providers, af dags list) und Validierung (af dags errors, af dags get, af dags explore) bieten sofortiges Feedback zu DAG...
official
blueprint
astronomer
Wiederverwendbare Airflow-Task-Gruppen-Vorlagen mit Pydantic-Validierung definieren und DAGs aus YAML zusammenstellen. Verwenden beim Erstellen von Blueprint-Vorlagen, Zusammenstellen von DAGs aus…
official
checking-freshness
astronomer
Überprüft die Datenaktualität durch Abgleich von Tabellenzeitstempeln und Aktualisierungsmustern mit einer Veraltungsskala. Identifiziert Zeitstempelspalten anhand gängiger ETL-Benennungsmuster (_loaded_at, _updated_at, created_at usw.) und fragt deren Maximalwerte ab, um das Alter zu bestimmen. Klassifiziert Daten in vier Aktualitätsstatus: Frisch (< 4 Stunden), Veraltet (4–24 Stunden), Stark veraltet (> 24 Stunden) oder Unbekannt (kein Zeitstempel gefunden). Stellt SQL-Vorlagen zur Überprüfung der letzten Aktualisierungszeit und der Zeilenanzahltrends der letzten Tage bereit, um...
official