tracing-upstream-lineage

Rastrea el linaje de datos ascendente para identificar fuentes, DAGs y dependencias que alimentan una tabla o columna. Admite el rastreo de tres tipos de destino: tablas, columnas y DAGs; utiliza el código fuente del DAG de Airflow y la inspección de tareas para encontrar pipelines productores. Maneja fuentes SQL (cláusulas FROM), sistemas externos (S3, Postgres, Salesforce, APIs HTTP) y fuentes basadas en archivos; rastrea cadenas ascendentes de forma recursiva. Incluye rastreo a nivel de columna mediante asignaciones directas, transformaciones y agregaciones en el código del DAG...

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

Lineage Investigation

Step 1: Identify the Target Type

Determine what we're tracing:

  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from

Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

  1. Search DAGs by name: Use af dags list and look for DAG names matching the table name

    • load_customers -> customers table
    • etl_daily_orders -> orders table
  2. Explore DAG source code: Use af dags source <dag_id> to read the DAG definition

    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs

On Astro

If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

SQL Sources (look for FROM clauses):

# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source

External Sources (look for connection references):

  • S3Operator -> S3 bucket source
  • PostgresOperator -> Postgres database source
  • SalesforceOperator -> Salesforce API source
  • HttpOperator -> REST API source

File Sources:

  • CSV/Parquet files in object storage
  • SFTP drops
  • Local file paths

Step 4: Build the Lineage Chain

Recursively trace each source:

TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

For each upstream source:

  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with af dags stats
  • External systems: Note connection info from DAG code

Lineage for Columns

When tracing a specific column:

  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings: source.col AS target_col
    • Transformations: COALESCE(a.col, b.col) AS target_col
    • Aggregations: SUM(detail.amount) AS total_amount

Output: Lineage Report

Summary

One-line answer: "This table is populated by DAG X from sources Y and Z"

Lineage Diagram

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops

Transformation Chain

Describe how data flows and transforms:

  1. Raw data lands in raw.orders via Salesforce API sync
  2. DAG transform_orders cleans and dedupes into stg.orders
  3. DAG build_order_facts joins with dimensions into fct.orders

Data Quality Implications

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?

Related Skills

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill

Más skills de astronomer

airflow
astronomer
Consulta, gestiona y soluciona problemas de DAGs, ejecuciones, tareas y configuración del sistema de Apache Airflow. Soporta más de 30 comandos para inspección de DAGs, gestión de ejecuciones, registro de tareas, consultas de configuración y acceso directo a la API REST. Administra múltiples instancias de Airflow con configuración persistente; descubre automáticamente implementaciones locales y de Astro. Ejecuta DAGs de forma síncrona (esperando su finalización) o asíncrona, diagnostica fallos, limpia ejecuciones para reintentos y accede a registros de tareas con filtros de reintento e índice de mapa. Salida...
official
airflow-hitl
astronomer
Compuertas de aprobación humana, entradas de formulario y ramificación en DAGs de Airflow utilizando operadores diferibles. Cuatro tipos de operadores: ApprovalOperator para decisiones de aprobar/rechazar, HITLOperator para selección de múltiples opciones con formularios, HITLBranchOperator para enrutamiento de tareas impulsado por humanos y HITLEntryOperator para recopilación de datos de formularios. Todos los operadores son diferibles, liberando espacios de trabajo mientras esperan respuesta humana a través de la pestaña de Acciones Requeridas de la interfaz de usuario de Airflow o la API REST. Soporta características opcionales que incluyen personalización...
official
airflow-plugins
astronomer
Crea plugins de Airflow 3.1+ que integren aplicaciones FastAPI, páginas de UI personalizadas, componentes React, middleware, macros y enlaces de operadores directamente en la interfaz de Airflow. Usa…
official
analyzing-data
astronomer
Consulta tu almacén de datos para responder preguntas de negocio con patrones en caché y mapeos de conceptos. Soporta búsqueda de patrones y almacenamiento en caché para tipos de preguntas repetidas, con registro de resultados para mejorar consultas futuras. Incluye caché de mapeo concepto-tabla y descubrimiento de esquemas de tablas mediante INFORMATION_SCHEMA o búsqueda en el código base. Proporciona funciones kernel run_sql() y run_sql_pandas() que devuelven DataFrames de Polars o Pandas para análisis. Comandos CLI para gestionar cachés de conceptos, patrones y tablas, además de...
official
annotating-task-lineage
astronomer
Anotar tareas de Airflow con linaje de datos utilizando inlets y outlets. Soporta objetos Dataset de OpenLineage, Assets de Airflow y Datasets de Airflow para definir entradas y salidas en bases de datos, almacenes de datos y almacenamiento en la nube. Úselo como alternativa cuando los operadores carezcan de extractores OpenLineage integrados; sigue un sistema de precedencia de cuatro niveles donde los extractores personalizados y los métodos OpenLineage tienen prioridad. Incluye ayudantes de nomenclatura de datasets para Snowflake, BigQuery, S3 y PostgreSQL para garantizar consistencia...
official
authoring-dags
astronomer
Flujo de trabajo guiado para crear DAGs de Apache Airflow con integración de validación y pruebas. Enfoque estructurado de seis fases: descubrir el entorno y patrones existentes, planificar la estructura del DAG, implementar siguiendo las mejores prácticas, validar con comandos de la CLI de af, probar con consentimiento del usuario, e iterar sobre correcciones. Los comandos de la CLI para descubrimiento (af config connections, af config providers, af dags list) y validación (af dags errors, af dags get, af dags explore) proporcionan retroalimentación inmediata sobre el DAG...
official
blueprint
astronomer
Define plantillas reutilizables de grupos de tareas de Airflow con validación Pydantic y componga DAGs desde YAML. Úselo al crear plantillas de blueprint, componer DAGs desde…
official
checking-freshness
astronomer
Verifica la frescura de los datos revisando las marcas de tiempo de las tablas y los patrones de actualización frente a una escala de obsolescencia. Identifica columnas de marca de tiempo usando patrones comunes de nomenclatura ETL (_loaded_at, _updated_at, created_at, etc.) y consulta sus valores máximos para determinar la antigüedad. Clasifica los datos en cuatro estados de frescura: Fresco (< 4 horas), Obsoleto (4–24 horas), Muy obsoleto (> 24 horas) o Desconocido (sin marca de tiempo encontrada). Proporciona plantillas SQL para verificar la última hora de actualización y las tendencias de recuento de filas en días recientes para...
official