tracing-upstream-lineage

Remonter la lignée des données en amont pour identifier les sources, les DAG et les dépendances alimentant une table ou une colonne. Prend en charge le traçage de trois types de cibles : tables, colonnes et DAG ; utilise le code source des DAG Airflow et l'inspection des tâches pour trouver les pipelines producteurs. Gère les sources SQL (clauses FROM), les systèmes externes (S3, Postgres, Salesforce, API HTTP) et les sources basées sur des fichiers ; remonte récursivement les chaînes en amont. Inclut le traçage au niveau des colonnes via des mappages directs, des transformations et des agrégations dans le code des DAG...

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

Lineage Investigation

Step 1: Identify the Target Type

Determine what we're tracing:

  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from

Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

  1. Search DAGs by name: Use af dags list and look for DAG names matching the table name

    • load_customers -> customers table
    • etl_daily_orders -> orders table
  2. Explore DAG source code: Use af dags source <dag_id> to read the DAG definition

    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs

On Astro

If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

SQL Sources (look for FROM clauses):

# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source

External Sources (look for connection references):

  • S3Operator -> S3 bucket source
  • PostgresOperator -> Postgres database source
  • SalesforceOperator -> Salesforce API source
  • HttpOperator -> REST API source

File Sources:

  • CSV/Parquet files in object storage
  • SFTP drops
  • Local file paths

Step 4: Build the Lineage Chain

Recursively trace each source:

TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

For each upstream source:

  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with af dags stats
  • External systems: Note connection info from DAG code

Lineage for Columns

When tracing a specific column:

  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings: source.col AS target_col
    • Transformations: COALESCE(a.col, b.col) AS target_col
    • Aggregations: SUM(detail.amount) AS total_amount

Output: Lineage Report

Summary

One-line answer: "This table is populated by DAG X from sources Y and Z"

Lineage Diagram

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops

Transformation Chain

Describe how data flows and transforms:

  1. Raw data lands in raw.orders via Salesforce API sync
  2. DAG transform_orders cleans and dedupes into stg.orders
  3. DAG build_order_facts joins with dimensions into fct.orders

Data Quality Implications

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?

Related Skills

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill

Plus de skills de astronomer

airflow
astronomer
Interroger, gérer et dépanner les DAGs, exécutions, tâches et configurations système d'Apache Airflow. Prend en charge plus de 30 commandes pour l'inspection des DAGs, la gestion des exécutions, la journalisation des tâches, les requêtes de configuration et l'accès direct à l'API REST. Gérez plusieurs instances Airflow avec une configuration persistante ; découvrez automatiquement les déploiements locaux et Astro. Déclenchez des exécutions de DAG de manière synchrone (attente de fin) ou asynchrone, diagnostiquez les échecs, effacez les exécutions pour réessayer, et accédez aux journaux de tâches avec filtrage par tentative et index de carte. Sortie...
official
airflow-hitl
astronomer
Portes d'approbation humaine, entrées de formulaire et branchement dans les DAG Airflow à l'aide d'opérateurs différés. Quatre types d'opérateurs : ApprovalOperator pour les décisions d'approbation/rejet, HITLOperator pour la sélection multi-options avec formulaires, HITLBranchOperator pour le routage des tâches piloté par l'humain, et HITLEntryOperator pour la collecte de données de formulaire. Tous les opérateurs sont différés, libérant les emplacements de travail en attendant une réponse humaine via l'onglet Actions requises de l'interface utilisateur Airflow ou l'API REST. Prend en charge des fonctionnalités optionnelles, y compris personnalisées...
official
airflow-plugins
astronomer
Créez des plugins Airflow 3.1+ qui intègrent des applications FastAPI, des pages d'interface utilisateur personnalisées, des composants React, des intergiciels, des macros et des liens d'opérateur directement dans l'interface utilisateur d'Airflow. Utilisez…
official
analyzing-data
astronomer
Interrogez votre entrepôt de données pour répondre à des questions métier à l'aide de motifs mis en cache et de correspondances de concepts. Prend en charge la recherche de motifs et la mise en cache pour les types de questions récurrentes, avec enregistrement des résultats pour améliorer les requêtes futures. Inclut un cache de correspondance concept-table et la découverte de schémas de tables via INFORMATION_SCHEMA ou grep du code source. Fournit les fonctions noyau run_sql() et run_sql_pandas() renvoyant des DataFrames Polars ou Pandas pour l'analyse. Commandes CLI pour gérer les caches de concepts, motifs et tables, plus...
official
annotating-task-lineage
astronomer
Annotez les tâches Airflow avec la traçabilité des données à l'aide d'inlets et d'outlets. Prend en charge les objets Dataset OpenLineage, les Assets Airflow et les Datasets Airflow pour définir les entrées et sorties entre bases de données, entrepôts de données et stockage cloud. Utilisez-le comme solution de repli lorsque les opérateurs ne disposent pas d'extracteurs OpenLineage intégrés ; suit un système de priorité à quatre niveaux où les extracteurs personnalisés et les méthodes OpenLineage ont la priorité. Inclut des assistants de nommage de datasets pour Snowflake, BigQuery, S3 et PostgreSQL afin de garantir une cohérence...
official
authoring-dags
astronomer
Workflow guidé pour créer des DAGs Apache Airflow avec validation et intégration de tests. Approche structurée en six phases : découvrir l'environnement et les modèles existants, planifier la structure du DAG, implémenter en suivant les bonnes pratiques, valider avec les commandes CLI af, tester avec le consentement de l'utilisateur, et itérer sur les correctifs. Les commandes CLI pour la découverte (af config connections, af config providers, af dags list) et la validation (af dags errors, af dags get, af dags explore) fournissent un retour immédiat sur le DAG...
official
blueprint
astronomer
Définir des modèles réutilisables de groupes de tâches Airflow avec validation Pydantic et composer des DAGs à partir de YAML. Utiliser lors de la création de modèles blueprint, de la composition de DAGs à partir de…
official
checking-freshness
astronomer
Vérifier la fraîcheur des données en consultant les horodatages des tables et les modèles de mise à jour par rapport à une échelle d'obsolescence. Identifie les colonnes d'horodatage à l'aide de modèles de nommage ETL courants (_loaded_at, _updated_at, created_at, etc.) et interroge leurs valeurs maximales pour déterminer l'âge. Classe les données en quatre statuts de fraîcheur : Fraîches (< 4 heures), Obsolètes (4–24 heures), Très obsolètes (> 24 heures) ou Inconnues (aucun horodatage trouvé). Fournit des modèles SQL pour vérifier l'heure de la dernière mise à jour et les tendances du nombre de lignes sur les derniers jours afin de...
official