airflow

Consulta, gestiona y soluciona problemas de DAGs, ejecuciones, tareas y configuración del sistema de Apache Airflow. Soporta más de 30 comandos para inspección de DAGs, gestión de ejecuciones, registro de tareas, consultas de configuración y acceso directo a la API REST. Administra múltiples instancias de Airflow con configuración persistente; descubre automáticamente implementaciones locales y de Astro. Ejecuta DAGs de forma síncrona (esperando su finalización) o asíncrona, diagnostica fallos, limpia ejecuciones para reintentos y accede a registros de tareas con filtros de reintento e índice de mapa. Salida...

npx skills add https://github.com/astronomer/agents --skill airflow

Airflow Operations

Use af commands to query, manage, and troubleshoot Airflow workflows.

Astro CLI

The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:

# Initialize a new project
astro dev init

# Start local Airflow (webserver at http://localhost:8080)
astro dev start

# Parse DAGs to catch errors quickly (no need to start Airflow)
astro dev parse

# Run pytest against your DAGs
astro dev pytest

# Deploy to production
astro deploy            # Full deploy (image + DAGs)
astro deploy --dags     # DAG-only deploy (fast, no image build)

For more details:

  • New project? See the setting-up-astro-project skill
  • Local environment? See the managing-astro-local-env skill
  • Deploying? See the deploying-airflow skill

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.

Instance Configuration

Manage multiple Airflow instances with persistent configuration:

# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin

# List and switch instances
af instance list      # Shows all instances in a table
af instance use prod  # Switch to prod instance
af instance current   # Show current instance
af instance delete old-instance

# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run        # Preview all discoverable instances
af instance discover                  # Discover from all backends (astro, local)
af instance discover astro            # Discover Astro deployments only
af instance discover astro --all-workspaces  # Include all accessible workspaces
af instance discover local            # Scan common local Airflow ports
af instance discover local --scan     # Deep scan all ports 1024-65535

# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.

# Show where an instance came from (file path + scope)
af instance show prod

# Override instance for a single command via env vars
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list

# Or switch persistently
af instance use staging

Config layout (mirrors git config system/global/local):

ScopeFileCommitted?
Global~/.astro/config.yamln/a (per-user)
Project shared<root>/.astro/config.yamlyes
Project local<root>/.astro/config.local.yamlno (gitignored)

<root> is found by walking up from cwd looking for .astro/. Default write routing inside a project: add/discover → project-shared, use → project-local. Override with --global / --project / --local. Set AF_CONFIG=<path> to bypass layering and use a single file.

Migrate from the legacy ~/.af/config.yaml with af migrate (idempotent; renames the old file to .bak).

Tokens in config can reference environment variables using ${VAR} syntax:

instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}

Or use environment variables directly (no config file needed):

export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin

Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>

Quick Reference

CommandDescription
af healthSystem health check
af dags listList all DAGs
af dags get <dag_id>Get DAG details
af dags explore <dag_id>Full DAG investigation
af dags source <dag_id>Get DAG source code
af dags pause <dag_id>Pause DAG scheduling
af dags unpause <dag_id>Resume DAG scheduling
af dags errorsList import errors
af dags warningsList DAG warnings
af dags statsDAG run statistics
af runs listList DAG runs
af runs get <dag_id> <run_id>Get run details
af runs trigger <dag_id>Trigger a DAG run
af runs trigger-wait <dag_id>Trigger and wait for completion
af runs delete <dag_id> <run_id>Permanently delete a DAG run
af runs clear <dag_id> <run_id>Clear a run for re-execution
af runs diagnose <dag_id> <run_id>Diagnose failed run
af tasks list <dag_id>List tasks in DAG
af tasks get <dag_id> <task_id>Get task definition
af tasks instance <dag_id> <run_id> <task_id>Get task instance
af tasks logs <dag_id> <run_id> <task_id>Get task logs
af config versionAirflow version
af config showFull configuration
af config connectionsList connections
af config variablesList variables
af config variable <key>Get specific variable
af config poolsList pools
af config pool <name>Get pool details
af config pluginsList plugins
af config providersList providers
af config assetsList assets/datasets
af api <endpoint>Direct REST API access
af api lsList available API endpoints
af api ls --filter XList endpoints matching pattern
af registry providersList providers in the Airflow Registry
af registry modules <provider>List operators/hooks/sensors/transfers in a provider
af registry parameters <provider>Constructor signatures (name, type, default, required) for a provider's classes
af registry connections <provider>Connection types a provider exposes

User Intent Patterns

Getting Started

  • "How do I run Airflow locally?" / "Set up Airflow" -> use the managing-astro-local-env skill (uses Astro CLI)
  • "Create a new Airflow project" / "Initialize project" -> use the setting-up-astro-project skill (uses Astro CLI)
  • "How do I install Airflow?" / "Get started with Airflow" -> use the setting-up-astro-project skill

DAG Operations

  • "What DAGs exist?" / "List all DAGs" -> af dags list
  • "Tell me about DAG X" / "What is DAG Y?" -> af dags explore <dag_id>
  • "What's the schedule for DAG X?" -> af dags get <dag_id>
  • "Show me the code for DAG X" -> af dags source <dag_id>
  • "Stop DAG X" / "Pause this workflow" -> af dags pause <dag_id>
  • "Resume DAG X" -> af dags unpause <dag_id>
  • "Are there any DAG errors?" -> af dags errors
  • "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill

Run Operations

  • "What runs have executed?" -> af runs list
  • "Run DAG X" / "Trigger the pipeline" -> af runs trigger <dag_id>
  • "Run DAG X and wait" -> af runs trigger-wait <dag_id>
  • "Why did this run fail?" -> af runs diagnose <dag_id> <run_id>
  • "Delete this run" / "Remove stuck run" -> af runs delete <dag_id> <run_id>
  • "Clear this run" / "Retry this run" / "Re-run this" -> af runs clear <dag_id> <run_id>
  • "Test this DAG and fix if it fails" -> use the testing-dags skill

Task Operations

  • "What tasks are in DAG X?" -> af tasks list <dag_id>
  • "Get task logs" / "Why did task fail?" -> af tasks logs <dag_id> <run_id> <task_id>
  • "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill

Data Operations

  • "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
  • "Where does this data come from?" -> use the tracing-upstream-lineage skill
  • "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill

Deployment Operations

  • "Deploy my DAGs" / "Push to production" -> use the deploying-airflow skill
  • "Set up CI/CD" / "Automate deploys" -> use the deploying-airflow skill
  • "Deploy to Kubernetes" / "Set up Helm" -> use the deploying-airflow skill
  • "astro deploy" / "DAG-only deploy" -> use the deploying-airflow skill

System Operations

  • "What version of Airflow?" -> af config version
  • "What connections exist?" -> af config connections
  • "Are pools full?" -> af config pools
  • "Is Airflow healthy?" -> af health

API Exploration

  • "What API endpoints are available?" -> af api ls
  • "Find variable endpoints" -> af api ls --filter variable
  • "Access XCom values" / "Get XCom" -> af api xcom-entries -F dag_id=X -F task_id=Y
  • "Get event logs" / "Audit trail" -> af api event-logs -F dag_id=X
  • "Create connection via API" -> af api connections -X POST --body '{...}'
  • "Create variable via API" -> af api variables -X POST -F key=name -f value=val

Registry Discovery

  • "What operators does provider X have?" -> af registry modules <provider>
  • "What are the constructor params for operator Y?" -> af registry parameters <provider>
  • "What providers exist?" / "Is there a provider for Z?" -> af registry providers
  • "What connection types does provider X expose?" -> af registry connections <provider>
  • "Writing a DAG with a specific operator" -> use registry to verify current signature before copying examples

Common Workflows

Validate DAGs Before Deploying

If you're using the Astro CLI, you can validate DAGs without a running Airflow instance:

# Parse DAGs to catch import errors and syntax issues
astro dev parse

# Run unit tests
astro dev pytest

Otherwise, validate against a running instance:

af dags errors     # Check for parse/import errors
af dags warnings   # Check for deprecation warnings

Discover Operator Signatures Before Writing Code

The Airflow Registry at airflow.apache.org/registry is the authoritative source for provider classes and their current constructor signatures. Prefer it over memory or stale documentation when authoring DAGs — the registry reflects the live provider release.

# List all providers and pick the one you need
af registry providers | jq '.providers[] | {id, name, version}'

# List every operator / hook / sensor in a provider (e.g. standard, amazon, google)
af registry modules standard \
  | jq '.modules[] | {name, type, import_path, docs_url}'

# Get the current constructor signature for a specific class
af registry parameters standard \
  | jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'

# Filter modules by substring (useful when you know the concept but not the class)
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("hitl"))'

Results are cached locally: 1 hour for the latest version, 30 days for pinned versions (which are immutable). Add --version X.Y.Z to any modules / parameters / connections call to target a specific release.

Investigate a Failed Run

# 1. List recent runs to find failure
af runs list --dag-id my_dag

# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data

# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00

Morning Health Check

# 1. Overall system health
af health

# 2. Check for broken DAGs
af dags errors

# 3. Check pool utilization
af config pools

Understand a DAG

# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag

Check Why DAG Isn't Running

# Check if paused
af dags get my_dag

# Check for import errors
af dags errors

# Check recent runs
af runs list --dag-id my_dag

Trigger and Monitor

# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800

# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>

Output Format

All commands output JSON (except instance commands which use human-readable tables):

af dags list
# {
#   "total_dags": 5,
#   "returned_count": 5,
#   "dags": [...]
# }

Use jq for filtering:

# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'

# Get DAG IDs only
af dags list | jq '.dags[].dag_id'

# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'

Task Logs Options

# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2

# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5

Direct API Access with af api

Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).

# Discover available endpoints
af api ls
af api ls --filter variable

# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE

Field syntax: -F key=value auto-converts types, -f key=value keeps as string.

Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.

Related Skills

SkillUse when...
authoring-dagsCreating or editing DAG files with best practices
testing-dagsIterative test -> debug -> fix -> retest cycles
debugging-dagsDeep root cause analysis and failure diagnosis
checking-freshnessChecking if data is up to date or stale
tracing-upstream-lineageFinding where data comes from
tracing-downstream-lineageImpact analysis -- what breaks if something changes
deploying-airflowDeploying DAGs to production (Astro, Docker Compose, Kubernetes)
migrating-airflow-2-to-3Upgrading DAGs from Airflow 2.x to 3.x
managing-astro-local-envStarting, stopping, or troubleshooting local Airflow
setting-up-astro-projectInitializing a new Astro/Airflow project

Más skills de astronomer

airflow-hitl
astronomer
Compuertas de aprobación humana, entradas de formulario y ramificación en DAGs de Airflow utilizando operadores diferibles. Cuatro tipos de operadores: ApprovalOperator para decisiones de aprobar/rechazar, HITLOperator para selección de múltiples opciones con formularios, HITLBranchOperator para enrutamiento de tareas impulsado por humanos y HITLEntryOperator para recopilación de datos de formularios. Todos los operadores son diferibles, liberando espacios de trabajo mientras esperan respuesta humana a través de la pestaña de Acciones Requeridas de la interfaz de usuario de Airflow o la API REST. Soporta características opcionales que incluyen personalización...
official
airflow-plugins
astronomer
Crea plugins de Airflow 3.1+ que integren aplicaciones FastAPI, páginas de UI personalizadas, componentes React, middleware, macros y enlaces de operadores directamente en la interfaz de Airflow. Usa…
official
analyzing-data
astronomer
Consulta tu almacén de datos para responder preguntas de negocio con patrones en caché y mapeos de conceptos. Soporta búsqueda de patrones y almacenamiento en caché para tipos de preguntas repetidas, con registro de resultados para mejorar consultas futuras. Incluye caché de mapeo concepto-tabla y descubrimiento de esquemas de tablas mediante INFORMATION_SCHEMA o búsqueda en el código base. Proporciona funciones kernel run_sql() y run_sql_pandas() que devuelven DataFrames de Polars o Pandas para análisis. Comandos CLI para gestionar cachés de conceptos, patrones y tablas, además de...
official
annotating-task-lineage
astronomer
Anotar tareas de Airflow con linaje de datos utilizando inlets y outlets. Soporta objetos Dataset de OpenLineage, Assets de Airflow y Datasets de Airflow para definir entradas y salidas en bases de datos, almacenes de datos y almacenamiento en la nube. Úselo como alternativa cuando los operadores carezcan de extractores OpenLineage integrados; sigue un sistema de precedencia de cuatro niveles donde los extractores personalizados y los métodos OpenLineage tienen prioridad. Incluye ayudantes de nomenclatura de datasets para Snowflake, BigQuery, S3 y PostgreSQL para garantizar consistencia...
official
authoring-dags
astronomer
Flujo de trabajo guiado para crear DAGs de Apache Airflow con integración de validación y pruebas. Enfoque estructurado de seis fases: descubrir el entorno y patrones existentes, planificar la estructura del DAG, implementar siguiendo las mejores prácticas, validar con comandos de la CLI de af, probar con consentimiento del usuario, e iterar sobre correcciones. Los comandos de la CLI para descubrimiento (af config connections, af config providers, af dags list) y validación (af dags errors, af dags get, af dags explore) proporcionan retroalimentación inmediata sobre el DAG...
official
blueprint
astronomer
Define plantillas reutilizables de grupos de tareas de Airflow con validación Pydantic y componga DAGs desde YAML. Úselo al crear plantillas de blueprint, componer DAGs desde…
official
checking-freshness
astronomer
Verifica la frescura de los datos revisando las marcas de tiempo de las tablas y los patrones de actualización frente a una escala de obsolescencia. Identifica columnas de marca de tiempo usando patrones comunes de nomenclatura ETL (_loaded_at, _updated_at, created_at, etc.) y consulta sus valores máximos para determinar la antigüedad. Clasifica los datos en cuatro estados de frescura: Fresco (< 4 horas), Obsoleto (4–24 horas), Muy obsoleto (> 24 horas) o Desconocido (sin marca de tiempo encontrada). Proporciona plantillas SQL para verificar la última hora de actualización y las tendencias de recuento de filas en días recientes para...
official
cosmos-dbt-core
astronomer
Convierte proyectos de dbt Core en DAGs o TaskGroups de Airflow usando Astronomer Cosmos. Soporta tres patrones de ensamblaje: DbtDag independiente, DbtTaskGroup dentro de DAGs existentes y operadores individuales de Cosmos para control detallado. Elige entre ocho modos de ejecución (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC y otros) según las necesidades de aislamiento y rendimiento. Ofrece tres estrategias de análisis (dbt_manifest, dbt_ls, dbt_ls_file, automática) para equilibrar velocidad y complejidad de selectores...
official