tracing-upstream-lineage

Rastreia a linhagem upstream de dados para identificar fontes, DAGs e dependências que alimentam uma tabela ou coluna. Suporta rastreamento de três tipos de destino: tabelas, colunas e DAGs; utiliza o código-fonte do DAG do Airflow e inspeção de tarefas para encontrar pipelines produtores. Lida com fontes SQL (cláusulas FROM), sistemas externos (S3, Postgres, Salesforce, APIs HTTP) e fontes baseadas em arquivos; rastreia recursivamente cadeias upstream. Inclui rastreamento em nível de coluna por meio de mapeamentos diretos, transformações e agregações no código do DAG...

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

Lineage Investigation

Step 1: Identify the Target Type

Determine what we're tracing:

  • Table: Trace what populates this table
  • Column: Trace where this specific column comes from
  • DAG: Trace what data sources this DAG reads from

Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

  1. Search DAGs by name: Use af dags list and look for DAG names matching the table name

    • load_customers -> customers table
    • etl_daily_orders -> orders table
  2. Explore DAG source code: Use af dags source <dag_id> to read the DAG definition

    • Look for INSERT, MERGE, CREATE TABLE statements
    • Find the target table in the code
  3. Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs

On Astro

If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

SQL Sources (look for FROM clauses):

# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source

External Sources (look for connection references):

  • S3Operator -> S3 bucket source
  • PostgresOperator -> Postgres database source
  • SalesforceOperator -> Salesforce API source
  • HttpOperator -> REST API source

File Sources:

  • CSV/Parquet files in object storage
  • SFTP drops
  • Local file paths

Step 4: Build the Lineage Chain

Recursively trace each source:

TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)

Step 5: Check Source Health

For each upstream source:

  • Tables: Check freshness with the checking-freshness skill
  • DAGs: Check recent run status with af dags stats
  • External systems: Note connection info from DAG code

Lineage for Columns

When tracing a specific column:

  1. Find the column in the target table schema
  2. Search DAG source code for references to that column name
  3. Trace through transformations:
    • Direct mappings: source.col AS target_col
    • Transformations: COALESCE(a.col, b.col) AS target_col
    • Aggregations: SUM(detail.amount) AS total_amount

Output: Lineage Report

Summary

One-line answer: "This table is populated by DAG X from sources Y and Z"

Lineage Diagram

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales

Source Details

SourceTypeConnectionFreshnessOwner
raw.ordersTableInternal2h agodata-team
SalesforceAPIsalesforce_connReal-timesales-ops

Transformation Chain

Describe how data flows and transforms:

  1. Raw data lands in raw.orders via Salesforce API sync
  2. DAG transform_orders cleans and dedupes into stg.orders
  3. DAG build_order_facts joins with dimensions into fct.orders

Data Quality Implications

  • Single points of failure?
  • Stale upstream sources?
  • Complex transformation chains that could break?

Related Skills

  • Check source freshness: checking-freshness skill
  • Debug source DAG: debugging-dags skill
  • Trace downstream impacts: tracing-downstream-lineage skill
  • Add manual lineage annotations: annotating-task-lineage skill
  • Build custom lineage extractors: creating-openlineage-extractors skill

Mais skills de astronomer

airflow
astronomer
Consulte, gerencie e solucione problemas de DAGs, execuções, tarefas e configuração de sistema do Apache Airflow. Suporta mais de 30 comandos para inspeção de DAGs, gerenciamento de execuções, registro de tarefas, consultas de configuração e acesso direto à API REST. Gerencie múltiplas instâncias do Airflow com configuração persistente; descubra automaticamente implantações locais e Astro. Dispare execuções de DAG de forma síncrona (aguardando conclusão) ou assíncrona, diagnostique falhas, limpe execuções para repetição e acesse logs de tarefas com filtragem por repetição/índice de mapa. Saída...
official
airflow-hitl
astronomer
Portões de aprovação humana, entradas de formulário e ramificações em DAGs do Airflow usando operadores adiáveis. Quatro tipos de operadores: ApprovalOperator para decisões de aprovar/rejeitar, HITLOperator para seleção de múltiplas opções com formulários, HITLBranchOperator para roteamento de tarefas orientado por humanos e HITLEntryOperator para coleta de dados de formulário. Todos os operadores são adiáveis, liberando slots de worker enquanto aguardam resposta humana via a aba Ações Necessárias da interface do Airflow ou API REST. Suporta recursos opcionais incluindo personalização...
official
airflow-plugins
astronomer
Crie plugins do Airflow 3.1+ que incorporam aplicativos FastAPI, páginas de UI personalizadas, componentes React, middleware, macros e links de operador diretamente na interface do Airflow. Use…
official
analyzing-data
astronomer
Consulte seu data warehouse para responder perguntas de negócios com padrões em cache e mapeamentos de conceitos. Suporta busca de padrões e cache para tipos de perguntas repetidas, com registro de resultados para melhorar consultas futuras. Inclui cache de mapeamento conceito-tabela e descoberta de esquemas de tabela via INFORMATION_SCHEMA ou grep no código-fonte. Fornece funções de kernel run_sql() e run_sql_pandas() que retornam DataFrames Polars ou Pandas para análise. Comandos CLI para gerenciar caches de conceitos, padrões e tabelas, além de...
official
annotating-task-lineage
astronomer
Anotar tarefas do Airflow com linhagem de dados usando inlets e outlets. Suporta objetos OpenLineage Dataset, Assets do Airflow e Datasets do Airflow para definir entradas e saídas em bancos de dados, data warehouses e armazenamento em nuvem. Use como fallback quando operadores não possuem extratores OpenLineage integrados; segue um sistema de precedência de quatro níveis onde extratores personalizados e métodos OpenLineage têm prioridade. Inclui auxiliares de nomenclatura de datasets para Snowflake, BigQuery, S3 e PostgreSQL para garantir consistência...
official
authoring-dags
astronomer
Fluxo de trabalho guiado para criação de DAGs do Apache Airflow com integração de validação e testes. Abordagem estruturada em seis fases: descobrir o ambiente e padrões existentes, planejar a estrutura da DAG, implementar seguindo as melhores práticas, validar com comandos da CLI af, testar com consentimento do usuário e iterar em correções. Comandos da CLI para descoberta (af config connections, af config providers, af dags list) e validação (af dags errors, af dags get, af dags explore) fornecem feedback imediato sobre a DAG...
official
blueprint
astronomer
Defina modelos reutilizáveis de grupos de tarefas do Airflow com validação Pydantic e componha DAGs a partir de YAML. Use ao criar modelos de blueprint, compor DAGs a partir de…
official
checking-freshness
astronomer
Verifique a atualização dos dados analisando os timestamps das tabelas e os padrões de atualização em relação a uma escala de obsolescência. Identifica colunas de timestamp usando padrões comuns de nomenclatura ETL (_loaded_at, _updated_at, created_at, etc.) e consulta seus valores máximos para determinar a idade. Classifica os dados em quatro status de atualização: Atualizados (< 4 horas), Desatualizados (4–24 horas), Muito Desatualizados (> 24 horas) ou Desconhecido (nenhum timestamp encontrado). Fornece modelos SQL para verificar o horário da última atualização e as tendências de contagem de linhas nos dias recentes para...
official