airflow

Truy vấn, quản lý và khắc phục sự cố DAG, lần chạy, tác vụ và cấu hình hệ thống Apache Airflow. Hỗ trợ hơn 30 lệnh bao gồm kiểm tra DAG, quản lý lần chạy, ghi nhật ký tác vụ, truy vấn cấu hình và truy cập trực tiếp REST API. Quản lý nhiều phiên bản Airflow với cấu hình liên tục; tự động phát hiện triển khai cục bộ và Astro. Kích hoạt chạy DAG đồng bộ (chờ hoàn thành) hoặc không đồng bộ, chẩn đoán lỗi, xóa lần chạy để thử lại, và truy cập nhật ký tác vụ với bộ lọc thử lại/ch

npx skills add https://github.com/astronomer/agents --skill airflow

Airflow Operations

Use af commands to query, manage, and troubleshoot Airflow workflows.

Astro CLI

The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:

# Initialize a new project
astro dev init

# Start local Airflow (webserver at http://localhost:8080)
astro dev start

# Parse DAGs to catch errors quickly (no need to start Airflow)
astro dev parse

# Run pytest against your DAGs
astro dev pytest

# Deploy to production
astro deploy            # Full deploy (image + DAGs)
astro deploy --dags     # DAG-only deploy (fast, no image build)

For more details:

  • New project? See the setting-up-astro-project skill
  • Local environment? See the managing-astro-local-env skill
  • Deploying? See the deploying-airflow skill

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.

Instance Configuration

Manage multiple Airflow instances with persistent configuration:

# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin

# List and switch instances
af instance list      # Shows all instances in a table
af instance use prod  # Switch to prod instance
af instance current   # Show current instance
af instance delete old-instance

# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run        # Preview all discoverable instances
af instance discover                  # Discover from all backends (astro, local)
af instance discover astro            # Discover Astro deployments only
af instance discover astro --all-workspaces  # Include all accessible workspaces
af instance discover local            # Scan common local Airflow ports
af instance discover local --scan     # Deep scan all ports 1024-65535

# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.

# Show where an instance came from (file path + scope)
af instance show prod

# Override instance for a single command via env vars
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list

# Or switch persistently
af instance use staging

Config layout (mirrors git config system/global/local):

ScopeFileCommitted?
Global~/.astro/config.yamln/a (per-user)
Project shared<root>/.astro/config.yamlyes
Project local<root>/.astro/config.local.yamlno (gitignored)

<root> is found by walking up from cwd looking for .astro/. Default write routing inside a project: add/discover → project-shared, use → project-local. Override with --global / --project / --local. Set AF_CONFIG=<path> to bypass layering and use a single file.

Migrate from the legacy ~/.af/config.yaml with af migrate (idempotent; renames the old file to .bak).

Tokens in config can reference environment variables using ${VAR} syntax:

instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}

Or use environment variables directly (no config file needed):

export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin

Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>

Quick Reference

CommandDescription
af healthSystem health check
af dags listList all DAGs
af dags get <dag_id>Get DAG details
af dags explore <dag_id>Full DAG investigation
af dags source <dag_id>Get DAG source code
af dags pause <dag_id>Pause DAG scheduling
af dags unpause <dag_id>Resume DAG scheduling
af dags errorsList import errors
af dags warningsList DAG warnings
af dags statsDAG run statistics
af runs listList DAG runs
af runs get <dag_id> <run_id>Get run details
af runs trigger <dag_id>Trigger a DAG run
af runs trigger-wait <dag_id>Trigger and wait for completion
af runs delete <dag_id> <run_id>Permanently delete a DAG run
af runs clear <dag_id> <run_id>Clear a run for re-execution
af runs diagnose <dag_id> <run_id>Diagnose failed run
af tasks list <dag_id>List tasks in DAG
af tasks get <dag_id> <task_id>Get task definition
af tasks instance <dag_id> <run_id> <task_id>Get task instance
af tasks logs <dag_id> <run_id> <task_id>Get task logs
af config versionAirflow version
af config showFull configuration
af config connectionsList connections
af config variablesList variables
af config variable <key>Get specific variable
af config poolsList pools
af config pool <name>Get pool details
af config pluginsList plugins
af config providersList providers
af config assetsList assets/datasets
af api <endpoint>Direct REST API access
af api lsList available API endpoints
af api ls --filter XList endpoints matching pattern
af registry providersList providers in the Airflow Registry
af registry modules <provider>List operators/hooks/sensors/transfers in a provider
af registry parameters <provider>Constructor signatures (name, type, default, required) for a provider's classes
af registry connections <provider>Connection types a provider exposes

User Intent Patterns

Getting Started

  • "How do I run Airflow locally?" / "Set up Airflow" -> use the managing-astro-local-env skill (uses Astro CLI)
  • "Create a new Airflow project" / "Initialize project" -> use the setting-up-astro-project skill (uses Astro CLI)
  • "How do I install Airflow?" / "Get started with Airflow" -> use the setting-up-astro-project skill

DAG Operations

  • "What DAGs exist?" / "List all DAGs" -> af dags list
  • "Tell me about DAG X" / "What is DAG Y?" -> af dags explore <dag_id>
  • "What's the schedule for DAG X?" -> af dags get <dag_id>
  • "Show me the code for DAG X" -> af dags source <dag_id>
  • "Stop DAG X" / "Pause this workflow" -> af dags pause <dag_id>
  • "Resume DAG X" -> af dags unpause <dag_id>
  • "Are there any DAG errors?" -> af dags errors
  • "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill

Run Operations

  • "What runs have executed?" -> af runs list
  • "Run DAG X" / "Trigger the pipeline" -> af runs trigger <dag_id>
  • "Run DAG X and wait" -> af runs trigger-wait <dag_id>
  • "Why did this run fail?" -> af runs diagnose <dag_id> <run_id>
  • "Delete this run" / "Remove stuck run" -> af runs delete <dag_id> <run_id>
  • "Clear this run" / "Retry this run" / "Re-run this" -> af runs clear <dag_id> <run_id>
  • "Test this DAG and fix if it fails" -> use the testing-dags skill

Task Operations

  • "What tasks are in DAG X?" -> af tasks list <dag_id>
  • "Get task logs" / "Why did task fail?" -> af tasks logs <dag_id> <run_id> <task_id>
  • "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill

Data Operations

  • "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
  • "Where does this data come from?" -> use the tracing-upstream-lineage skill
  • "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill

Deployment Operations

  • "Deploy my DAGs" / "Push to production" -> use the deploying-airflow skill
  • "Set up CI/CD" / "Automate deploys" -> use the deploying-airflow skill
  • "Deploy to Kubernetes" / "Set up Helm" -> use the deploying-airflow skill
  • "astro deploy" / "DAG-only deploy" -> use the deploying-airflow skill

System Operations

  • "What version of Airflow?" -> af config version
  • "What connections exist?" -> af config connections
  • "Are pools full?" -> af config pools
  • "Is Airflow healthy?" -> af health

API Exploration

  • "What API endpoints are available?" -> af api ls
  • "Find variable endpoints" -> af api ls --filter variable
  • "Access XCom values" / "Get XCom" -> af api xcom-entries -F dag_id=X -F task_id=Y
  • "Get event logs" / "Audit trail" -> af api event-logs -F dag_id=X
  • "Create connection via API" -> af api connections -X POST --body '{...}'
  • "Create variable via API" -> af api variables -X POST -F key=name -f value=val

Registry Discovery

  • "What operators does provider X have?" -> af registry modules <provider>
  • "What are the constructor params for operator Y?" -> af registry parameters <provider>
  • "What providers exist?" / "Is there a provider for Z?" -> af registry providers
  • "What connection types does provider X expose?" -> af registry connections <provider>
  • "Writing a DAG with a specific operator" -> use registry to verify current signature before copying examples

Common Workflows

Validate DAGs Before Deploying

If you're using the Astro CLI, you can validate DAGs without a running Airflow instance:

# Parse DAGs to catch import errors and syntax issues
astro dev parse

# Run unit tests
astro dev pytest

Otherwise, validate against a running instance:

af dags errors     # Check for parse/import errors
af dags warnings   # Check for deprecation warnings

Discover Operator Signatures Before Writing Code

The Airflow Registry at airflow.apache.org/registry is the authoritative source for provider classes and their current constructor signatures. Prefer it over memory or stale documentation when authoring DAGs — the registry reflects the live provider release.

# List all providers and pick the one you need
af registry providers | jq '.providers[] | {id, name, version}'

# List every operator / hook / sensor in a provider (e.g. standard, amazon, google)
af registry modules standard \
  | jq '.modules[] | {name, type, import_path, docs_url}'

# Get the current constructor signature for a specific class
af registry parameters standard \
  | jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'

# Filter modules by substring (useful when you know the concept but not the class)
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("hitl"))'

Results are cached locally: 1 hour for the latest version, 30 days for pinned versions (which are immutable). Add --version X.Y.Z to any modules / parameters / connections call to target a specific release.

Investigate a Failed Run

# 1. List recent runs to find failure
af runs list --dag-id my_dag

# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data

# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00

Morning Health Check

# 1. Overall system health
af health

# 2. Check for broken DAGs
af dags errors

# 3. Check pool utilization
af config pools

Understand a DAG

# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag

Check Why DAG Isn't Running

# Check if paused
af dags get my_dag

# Check for import errors
af dags errors

# Check recent runs
af runs list --dag-id my_dag

Trigger and Monitor

# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800

# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>

Output Format

All commands output JSON (except instance commands which use human-readable tables):

af dags list
# {
#   "total_dags": 5,
#   "returned_count": 5,
#   "dags": [...]
# }

Use jq for filtering:

# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'

# Get DAG IDs only
af dags list | jq '.dags[].dag_id'

# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'

Task Logs Options

# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2

# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5

Direct API Access with af api

Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).

# Discover available endpoints
af api ls
af api ls --filter variable

# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE

Field syntax: -F key=value auto-converts types, -f key=value keeps as string.

Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.

Related Skills

SkillUse when...
authoring-dagsCreating or editing DAG files with best practices
testing-dagsIterative test -> debug -> fix -> retest cycles
debugging-dagsDeep root cause analysis and failure diagnosis
checking-freshnessChecking if data is up to date or stale
tracing-upstream-lineageFinding where data comes from
tracing-downstream-lineageImpact analysis -- what breaks if something changes
deploying-airflowDeploying DAGs to production (Astro, Docker Compose, Kubernetes)
migrating-airflow-2-to-3Upgrading DAGs from Airflow 2.x to 3.x
managing-astro-local-envStarting, stopping, or troubleshooting local Airflow
setting-up-astro-projectInitializing a new Astro/Airflow project

Thêm skills từ astronomer

airflow-hitl
astronomer
Cổng phê duyệt của con người, đầu vào biểu mẫu và phân nhánh trong DAG Airflow sử dụng các toán tử có thể trì hoãn. Bốn loại toán tử: ApprovalOperator cho quyết định phê duyệt/từ chối, HITLOperator cho lựa chọn nhiều tùy chọn với biểu mẫu, HITLBranchOperator cho định tuyến tác vụ do con người điều khiển và HITLEntryOperator cho thu thập dữ liệu biểu mẫu. Tất cả các toán tử đều có thể trì hoãn, giải phóng slot worker trong khi chờ phản hồi của con người qua tab Required Actions của giao diện Airflow hoặc REST API. Hỗ trợ các tính năng tùy chọn bao gồm tùy chỉnh...
official
airflow-plugins
astronomer
Xây dựng plugin Airflow 3.1+ nhúng ứng dụng FastAPI, trang UI tùy chỉnh, thành phần React, middleware, macro và liên kết toán tử trực tiếp vào giao diện Airflow. Sử dụng…
official
analyzing-data
astronomer
Truy vấn kho dữ liệu của bạn để trả lời các câu hỏi kinh doanh với các mẫu đã lưu trong bộ nhớ đệm và ánh xạ khái niệm. Hỗ trợ tra cứu mẫu và lưu vào bộ nhớ đệm cho các loại câu hỏi lặp lại, với ghi nhận kết quả để cải thiện các truy vấn trong tương lai. Bao gồm bộ nhớ đệm ánh xạ khái niệm sang bảng và khám phá lược đồ bảng qua INFORMATION_SCHEMA hoặc tìm kiếm trong mã nguồn. Cung cấp các hàm kernel run_sql() và run_sql_pandas() trả về DataFrame Polars hoặc Pandas để phân tích. Các lệnh CLI để quản lý bộ nhớ đệm khái
official
annotating-task-lineage
astronomer
Chú thích các tác vụ Airflow với dòng dữ liệu (data lineage) bằng cách sử dụng inlets và outlets. Hỗ trợ các đối tượng Dataset của OpenLineage, Assets của Airflow và Datasets của Airflow để xác định đầu vào và đầu ra trên các cơ sở dữ liệu, kho dữ liệu và lưu trữ đám mây. Sử dụng như phương án dự phòng khi các toán tử thiếu bộ trích xuất OpenLineage tích hợp sẵn; tuân theo hệ thống ưu tiên bốn cấp, trong đó các bộ trích xuất tùy chỉnh và phương thức OpenLineage được ưu tiên. Bao gồm các trình trợ giúp đặt tên dataset cho Snowflake, BigQuery, S3 và PostgreSQL để đảm bảo tính nhất quán...
official
authoring-dags
astronomer
Quy trình làm việc có hướng dẫn để tạo DAG Apache Airflow với tích hợp xác thực và kiểm thử. Phương pháp sáu giai đoạn có cấu trúc: khám phá môi trường và các mẫu hiện có, lập kế hoạch cấu trúc DAG, triển khai theo các phương pháp tốt nhất, xác thực bằng lệnh CLI af, kiểm thử với sự đồng ý của người dùng, và lặp lại các bước sửa lỗi. Các lệnh CLI để khám phá (af config connections, af config providers, af dags list) và xác thực (af dags errors, af dags get, af dags explore) cung cấp phản hồi tức thì về DAG...
official
blueprint
astronomer
Xác định các mẫu nhóm tác vụ Airflow có thể tái sử dụng với xác thực Pydantic và soạn DAG từ YAML. Sử dụng khi tạo mẫu blueprint, soạn DAG từ…
official
checking-freshness
astronomer
Kiểm tra độ tươi mới của dữ liệu bằng cách đối chiếu dấu thời gian bảng và mẫu cập nhật với thang đo độ cũ. Xác định các cột dấu thời gian sử dụng các mẫu đặt tên ETL phổ biến (_loaded_at, _updated_at, created_at, v.v.) và truy vấn giá trị tối đa của chúng để xác định tuổi. Phân loại dữ liệu thành bốn trạng thái độ tươi mới: Tươi (< 4 giờ), Cũ (4–24 giờ), Rất cũ (> 24 giờ) hoặc Không xác định (không tìm thấy dấu thời gian). Cung cấp các mẫu SQL để kiểm tra thời gian cập nhật cuối cùng và xu hướng số lượng hàng trong những ngày gần đây để...
official
cosmos-dbt-core
astronomer
Chuyển đổi các dự án dbt Core thành DAGs hoặc TaskGroups của Airflow bằng Astronomer Cosmos. Hỗ trợ ba mẫu lắp ráp: DbtDag độc lập, DbtTaskGroup trong các DAG hiện có và các toán tử Cosmos riêng lẻ để kiểm soát chi tiết. Chọn từ tám chế độ thực thi (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC và các chế độ khác) dựa trên nhu cầu cách ly và hiệu suất. Cung cấp ba chiến lược phân tích cú pháp (dbt_manifest, dbt_ls, dbt_ls_file, tự động) để cân bằng giữa tốc độ và độ phức tạp của bộ chọn...
official