airflowот astronomer

Query, manage, and troubleshoot Apache Airflow DAGs, runs, tasks, and system configuration. Supports 30+ commands across DAG inspection, run management, task logging, configuration queries, and direct REST API access Manage multiple Airflow instances with persistent configuration; auto-discover local and Astro deployments Trigger DAG runs synchronously (wait for completion) or asynchronously, diagnose failures, clear runs for retry, and access task logs with retry/map-index filtering Output...

npx skills add https://github.com/astronomer/agents --skill airflow

Airflow Operations

Use af commands to query, manage, and troubleshoot Airflow workflows.

Astro CLI

The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:

# Initialize a new project
astro dev init

# Start local Airflow (webserver at http://localhost:8080)
astro dev start

# Parse DAGs to catch errors quickly (no need to start Airflow)
astro dev parse

# Run pytest against your DAGs
astro dev pytest

# Deploy to production
astro deploy            # Full deploy (image + DAGs)
astro deploy --dags     # DAG-only deploy (fast, no image build)

For more details:

  • New project? See the setting-up-astro-project skill
  • Local environment? See the managing-astro-local-env skill
  • Deploying? See the deploying-airflow skill

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.

Instance Configuration

Manage multiple Airflow instances with persistent configuration:

# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin

# List and switch instances
af instance list      # Shows all instances in a table
af instance use prod  # Switch to prod instance
af instance current   # Show current instance
af instance delete old-instance

# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run        # Preview all discoverable instances
af instance discover                  # Discover from all backends (astro, local)
af instance discover astro            # Discover Astro deployments only
af instance discover astro --all-workspaces  # Include all accessible workspaces
af instance discover local            # Scan common local Airflow ports
af instance discover local --scan     # Deep scan all ports 1024-65535

# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.

# Show where an instance came from (file path + scope)
af instance show prod

# Override instance for a single command via env vars
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list

# Or switch persistently
af instance use staging

Config layout (mirrors git config system/global/local):

ScopeFileCommitted?
Global~/.astro/config.yamln/a (per-user)
Project shared<root>/.astro/config.yamlyes
Project local<root>/.astro/config.local.yamlno (gitignored)

<root> is found by walking up from cwd looking for .astro/. Default write routing inside a project: add/discover → project-shared, use → project-local. Override with --global / --project / --local. Set AF_CONFIG=<path> to bypass layering and use a single file.

Migrate from the legacy ~/.af/config.yaml with af migrate (idempotent; renames the old file to .bak).

Tokens in config can reference environment variables using ${VAR} syntax:

instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}

Or use environment variables directly (no config file needed):

export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin

Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>

Quick Reference

CommandDescription
af healthSystem health check
af dags listList all DAGs
af dags get <dag_id>Get DAG details
af dags explore <dag_id>Full DAG investigation
af dags source <dag_id>Get DAG source code
af dags pause <dag_id>Pause DAG scheduling
af dags unpause <dag_id>Resume DAG scheduling
af dags errorsList import errors
af dags warningsList DAG warnings
af dags statsDAG run statistics
af runs listList DAG runs
af runs get <dag_id> <run_id>Get run details
af runs trigger <dag_id>Trigger a DAG run
af runs trigger-wait <dag_id>Trigger and wait for completion
af runs delete <dag_id> <run_id>Permanently delete a DAG run
af runs clear <dag_id> <run_id>Clear a run for re-execution
af runs diagnose <dag_id> <run_id>Diagnose failed run
af tasks list <dag_id>List tasks in DAG
af tasks get <dag_id> <task_id>Get task definition
af tasks instance <dag_id> <run_id> <task_id>Get task instance
af tasks logs <dag_id> <run_id> <task_id>Get task logs
af config versionAirflow version
af config showFull configuration
af config connectionsList connections
af config variablesList variables
af config variable <key>Get specific variable
af config poolsList pools
af config pool <name>Get pool details
af config pluginsList plugins
af config providersList providers
af config assetsList assets/datasets
af api <endpoint>Direct REST API access
af api lsList available API endpoints
af api ls --filter XList endpoints matching pattern
af registry providersList providers in the Airflow Registry
af registry modules <provider>List operators/hooks/sensors/transfers in a provider
af registry parameters <provider>Constructor signatures (name, type, default, required) for a provider's classes
af registry connections <provider>Connection types a provider exposes

User Intent Patterns

Getting Started

  • "How do I run Airflow locally?" / "Set up Airflow" -> use the managing-astro-local-env skill (uses Astro CLI)
  • "Create a new Airflow project" / "Initialize project" -> use the setting-up-astro-project skill (uses Astro CLI)
  • "How do I install Airflow?" / "Get started with Airflow" -> use the setting-up-astro-project skill

DAG Operations

  • "What DAGs exist?" / "List all DAGs" -> af dags list
  • "Tell me about DAG X" / "What is DAG Y?" -> af dags explore <dag_id>
  • "What's the schedule for DAG X?" -> af dags get <dag_id>
  • "Show me the code for DAG X" -> af dags source <dag_id>
  • "Stop DAG X" / "Pause this workflow" -> af dags pause <dag_id>
  • "Resume DAG X" -> af dags unpause <dag_id>
  • "Are there any DAG errors?" -> af dags errors
  • "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill

Run Operations

  • "What runs have executed?" -> af runs list
  • "Run DAG X" / "Trigger the pipeline" -> af runs trigger <dag_id>
  • "Run DAG X and wait" -> af runs trigger-wait <dag_id>
  • "Why did this run fail?" -> af runs diagnose <dag_id> <run_id>
  • "Delete this run" / "Remove stuck run" -> af runs delete <dag_id> <run_id>
  • "Clear this run" / "Retry this run" / "Re-run this" -> af runs clear <dag_id> <run_id>
  • "Test this DAG and fix if it fails" -> use the testing-dags skill

Task Operations

  • "What tasks are in DAG X?" -> af tasks list <dag_id>
  • "Get task logs" / "Why did task fail?" -> af tasks logs <dag_id> <run_id> <task_id>
  • "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill

Data Operations

  • "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
  • "Where does this data come from?" -> use the tracing-upstream-lineage skill
  • "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill

Deployment Operations

  • "Deploy my DAGs" / "Push to production" -> use the deploying-airflow skill
  • "Set up CI/CD" / "Automate deploys" -> use the deploying-airflow skill
  • "Deploy to Kubernetes" / "Set up Helm" -> use the deploying-airflow skill
  • "astro deploy" / "DAG-only deploy" -> use the deploying-airflow skill

System Operations

  • "What version of Airflow?" -> af config version
  • "What connections exist?" -> af config connections
  • "Are pools full?" -> af config pools
  • "Is Airflow healthy?" -> af health

API Exploration

  • "What API endpoints are available?" -> af api ls
  • "Find variable endpoints" -> af api ls --filter variable
  • "Access XCom values" / "Get XCom" -> af api xcom-entries -F dag_id=X -F task_id=Y
  • "Get event logs" / "Audit trail" -> af api event-logs -F dag_id=X
  • "Create connection via API" -> af api connections -X POST --body '{...}'
  • "Create variable via API" -> af api variables -X POST -F key=name -f value=val

Registry Discovery

  • "What operators does provider X have?" -> af registry modules <provider>
  • "What are the constructor params for operator Y?" -> af registry parameters <provider>
  • "What providers exist?" / "Is there a provider for Z?" -> af registry providers
  • "What connection types does provider X expose?" -> af registry connections <provider>
  • "Writing a DAG with a specific operator" -> use registry to verify current signature before copying examples

Common Workflows

Validate DAGs Before Deploying

If you're using the Astro CLI, you can validate DAGs without a running Airflow instance:

# Parse DAGs to catch import errors and syntax issues
astro dev parse

# Run unit tests
astro dev pytest

Otherwise, validate against a running instance:

af dags errors     # Check for parse/import errors
af dags warnings   # Check for deprecation warnings

Discover Operator Signatures Before Writing Code

The Airflow Registry at airflow.apache.org/registry is the authoritative source for provider classes and their current constructor signatures. Prefer it over memory or stale documentation when authoring DAGs — the registry reflects the live provider release.

# List all providers and pick the one you need
af registry providers | jq '.providers[] | {id, name, version}'

# List every operator / hook / sensor in a provider (e.g. standard, amazon, google)
af registry modules standard \
  | jq '.modules[] | {name, type, import_path, docs_url}'

# Get the current constructor signature for a specific class
af registry parameters standard \
  | jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'

# Filter modules by substring (useful when you know the concept but not the class)
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("hitl"))'

Results are cached locally: 1 hour for the latest version, 30 days for pinned versions (which are immutable). Add --version X.Y.Z to any modules / parameters / connections call to target a specific release.

Investigate a Failed Run

# 1. List recent runs to find failure
af runs list --dag-id my_dag

# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data

# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00

Morning Health Check

# 1. Overall system health
af health

# 2. Check for broken DAGs
af dags errors

# 3. Check pool utilization
af config pools

Understand a DAG

# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag

Check Why DAG Isn't Running

# Check if paused
af dags get my_dag

# Check for import errors
af dags errors

# Check recent runs
af runs list --dag-id my_dag

Trigger and Monitor

# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800

# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>

Output Format

All commands output JSON (except instance commands which use human-readable tables):

af dags list
# {
#   "total_dags": 5,
#   "returned_count": 5,
#   "dags": [...]
# }

Use jq for filtering:

# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'

# Get DAG IDs only
af dags list | jq '.dags[].dag_id'

# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'

Task Logs Options

# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2

# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5

Direct API Access with af api

Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).

# Discover available endpoints
af api ls
af api ls --filter variable

# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE

Field syntax: -F key=value auto-converts types, -f key=value keeps as string.

Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.

Related Skills

SkillUse when...
authoring-dagsCreating or editing DAG files with best practices
testing-dagsIterative test -> debug -> fix -> retest cycles
debugging-dagsDeep root cause analysis and failure diagnosis
checking-freshnessChecking if data is up to date or stale
tracing-upstream-lineageFinding where data comes from
tracing-downstream-lineageImpact analysis -- what breaks if something changes
deploying-airflowDeploying DAGs to production (Astro, Docker Compose, Kubernetes)
migrating-airflow-2-to-3Upgrading DAGs from Airflow 2.x to 3.x
managing-astro-local-envStarting, stopping, or troubleshooting local Airflow
setting-up-astro-projectInitializing a new Astro/Airflow project

Больше skills от astronomer

airflow-hitl
by astronomer
Human approval gates, form inputs, and branching in Airflow DAGs using deferrable operators. Four operator types: ApprovalOperator for approve/reject decisions, HITLOperator for multi-option selection with forms, HITLBranchOperator for human-driven task routing, and HITLEntryOperator for form data collection All operators are deferrable, releasing worker slots while awaiting human response via Airflow UI's Required Actions tab or REST API Supports optional features including custom...
airflow-plugins
by astronomer
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use…
analyzing-data
by astronomer
Query your data warehouse to answer business questions with cached patterns and concept mappings. Supports pattern lookup and caching for repeated question types, with outcome recording to improve future queries Includes concept-to-table mapping cache and table schema discovery via INFORMATION_SCHEMA or codebase grep Provides run_sql() and run_sql_pandas() kernel functions returning Polars or Pandas DataFrames for analysis CLI commands for managing concept, pattern, and table caches, plus...
annotating-task-lineage
by astronomer
Annotate Airflow tasks with data lineage using inlets and outlets. Supports OpenLineage Dataset objects, Airflow Assets, and Airflow Datasets for defining inputs and outputs across databases, data warehouses, and cloud storage Use as a fallback when operators lack built-in OpenLineage extractors; follows a four-tier precedence system where custom extractors and OpenLineage methods take priority Includes dataset naming helpers for Snowflake, BigQuery, S3, and PostgreSQL to ensure consistent...
authoring-dags
by astronomer
Guided workflow for creating Apache Airflow DAGs with validation and testing integration. Structured six-phase approach: discover environment and existing patterns, plan DAG structure, implement following best practices, validate with af CLI commands, test with user consent, and iterate on fixes CLI commands for discovery ( af config connections , af config providers , af dags list ) and validation ( af dags errors , af dags get , af dags explore ) provide immediate feedback on DAG...
blueprint
by astronomer
Define reusable Airflow task group templates with Pydantic validation and compose DAGs from YAML. Use when creating blueprint templates, composing DAGs from…
checking-freshness
by astronomer
Verify data freshness by checking table timestamps and update patterns against a staleness scale. Identifies timestamp columns using common ETL naming patterns ( _loaded_at , _updated_at , created_at , etc.) and queries their maximum values to determine age Classifies data into four freshness statuses: Fresh (< 4 hours), Stale (4–24 hours), Very Stale (> 24 hours), or Unknown (no timestamp found) Provides SQL templates for checking last update time and row count trends over recent days to...
cosmos-dbt-core
by astronomer
Convert dbt Core projects into Airflow DAGs or TaskGroups using Astronomer Cosmos. Supports three assembly patterns: standalone DbtDag, DbtTaskGroup within existing DAGs, and individual Cosmos operators for fine-grained control Choose from eight execution modes (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC, and others) based on isolation and performance needs Offers three parsing strategies (dbt_manifest, dbt_ls, dbt_ls_file, automatic) to balance speed and selector complexity...

NotebookLM Web Importer

Импортируйте веб-страницы и видео YouTube в NotebookLM одним кликом. Более 200 000 пользователей доверяют нам.

Установить расширение Chrome