airflow

作成者: astronomer

Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...

npx skills add https://github.com/astronomer/agents --skill airflow

Airflow Operations

Use af commands to query, manage, and troubleshoot Airflow workflows.

Astro CLI

The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:

# Initialize a new project
astro dev init

# Start local Airflow (webserver at http://localhost:8080)
astro dev start

# Parse DAGs to catch errors quickly (no need to start Airflow)
astro dev parse

# Run pytest against your DAGs
astro dev pytest

# Deploy to production
astro deploy            # Full deploy (image + DAGs)
astro deploy --dags     # DAG-only deploy (fast, no image build)

For more details:

  • New project? See the setting-up-astro-project skill
  • Local environment? See the managing-astro-local-env skill
  • Deploying? See the deploying-airflow skill

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.

Instance Configuration

Manage multiple Airflow instances with persistent configuration:

# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin

# List and switch instances
af instance list      # Shows all instances in a table
af instance use prod  # Switch to prod instance
af instance current   # Show current instance
af instance delete old-instance

# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run        # Preview all discoverable instances
af instance discover                  # Discover from all backends (astro, local)
af instance discover astro            # Discover Astro deployments only
af instance discover astro --all-workspaces  # Include all accessible workspaces
af instance discover local            # Scan common local Airflow ports
af instance discover local --scan     # Deep scan all ports 1024-65535

# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.

# Show where an instance came from (file path + scope)
af instance show prod

# Override instance for a single command via env vars
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list

# Or switch persistently
af instance use staging

Config layout (mirrors git config system/global/local):

ScopeFileCommitted?
Global~/.astro/config.yamln/a (per-user)
Project shared<root>/.astro/config.yamlyes
Project local<root>/.astro/config.local.yamlno (gitignored)

<root> is found by walking up from cwd looking for .astro/. Default write routing inside a project: add/discover → project-shared, use → project-local. Override with --global / --project / --local. Set AF_CONFIG=<path> to bypass layering and use a single file.

Migrate from the legacy ~/.af/config.yaml with af migrate (idempotent; renames the old file to .bak).

Tokens in config can reference environment variables using ${VAR} syntax:

instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}

Or use environment variables directly (no config file needed):

export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin

Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>

Quick Reference

CommandDescription
af healthSystem health check
af dags listList all DAGs
af dags get <dag_id>Get DAG details
af dags explore <dag_id>Full DAG investigation
af dags source <dag_id>Get DAG source code
af dags pause <dag_id>Pause DAG scheduling
af dags unpause <dag_id>Resume DAG scheduling
af dags errorsList import errors
af dags warningsList DAG warnings
af dags statsDAG run statistics
af runs listList DAG runs
af runs get <dag_id> <run_id>Get run details
af runs trigger <dag_id>Trigger a DAG run
af runs trigger-wait <dag_id>Trigger and wait for completion
af runs delete <dag_id> <run_id>Permanently delete a DAG run
af runs clear <dag_id> <run_id>Clear a run for re-execution
af runs diagnose <dag_id> <run_id>Diagnose failed run
af tasks list <dag_id>List tasks in DAG
af tasks get <dag_id> <task_id>Get task definition
af tasks instance <dag_id> <run_id> <task_id>Get task instance
af tasks logs <dag_id> <run_id> <task_id>Get task logs
af config versionAirflow version
af config showFull configuration
af config connectionsList connections
af config variablesList variables
af config variable <key>Get specific variable
af config poolsList pools
af config pool <name>Get pool details
af config pluginsList plugins
af config providersList providers
af config assetsList assets/datasets
af api <endpoint>Direct REST API access
af api lsList available API endpoints
af api ls --filter XList endpoints matching pattern
af registry providersList providers in the Airflow Registry
af registry modules <provider>List operators/hooks/sensors/transfers in a provider
af registry parameters <provider>Constructor signatures (name, type, default, required) for a provider's classes
af registry connections <provider>Connection types a provider exposes

User Intent Patterns

Getting Started

  • "How do I run Airflow locally?" / "Set up Airflow" -> use the managing-astro-local-env skill (uses Astro CLI)
  • "Create a new Airflow project" / "Initialize project" -> use the setting-up-astro-project skill (uses Astro CLI)
  • "How do I install Airflow?" / "Get started with Airflow" -> use the setting-up-astro-project skill

DAG Operations

  • "What DAGs exist?" / "List all DAGs" -> af dags list
  • "Tell me about DAG X" / "What is DAG Y?" -> af dags explore <dag_id>
  • "What's the schedule for DAG X?" -> af dags get <dag_id>
  • "Show me the code for DAG X" -> af dags source <dag_id>
  • "Stop DAG X" / "Pause this workflow" -> af dags pause <dag_id>
  • "Resume DAG X" -> af dags unpause <dag_id>
  • "Are there any DAG errors?" -> af dags errors
  • "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill

Run Operations

  • "What runs have executed?" -> af runs list
  • "Run DAG X" / "Trigger the pipeline" -> af runs trigger <dag_id>
  • "Run DAG X and wait" -> af runs trigger-wait <dag_id>
  • "Why did this run fail?" -> af runs diagnose <dag_id> <run_id>
  • "Delete this run" / "Remove stuck run" -> af runs delete <dag_id> <run_id>
  • "Clear this run" / "Retry this run" / "Re-run this" -> af runs clear <dag_id> <run_id>
  • "Test this DAG and fix if it fails" -> use the testing-dags skill

Task Operations

  • "What tasks are in DAG X?" -> af tasks list <dag_id>
  • "Get task logs" / "Why did task fail?" -> af tasks logs <dag_id> <run_id> <task_id>
  • "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill

Data Operations

  • "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
  • "Where does this data come from?" -> use the tracing-upstream-lineage skill
  • "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill

Deployment Operations

  • "Deploy my DAGs" / "Push to production" -> use the deploying-airflow skill
  • "Set up CI/CD" / "Automate deploys" -> use the deploying-airflow skill
  • "Deploy to Kubernetes" / "Set up Helm" -> use the deploying-airflow skill
  • "astro deploy" / "DAG-only deploy" -> use the deploying-airflow skill

System Operations

  • "What version of Airflow?" -> af config version
  • "What connections exist?" -> af config connections
  • "Are pools full?" -> af config pools
  • "Is Airflow healthy?" -> af health

API Exploration

  • "What API endpoints are available?" -> af api ls
  • "Find variable endpoints" -> af api ls --filter variable
  • "Access XCom values" / "Get XCom" -> af api xcom-entries -F dag_id=X -F task_id=Y
  • "Get event logs" / "Audit trail" -> af api event-logs -F dag_id=X
  • "Create connection via API" -> af api connections -X POST --body '{...}'
  • "Create variable via API" -> af api variables -X POST -F key=name -f value=val

Registry Discovery

  • "What operators does provider X have?" -> af registry modules <provider>
  • "What are the constructor params for operator Y?" -> af registry parameters <provider>
  • "What providers exist?" / "Is there a provider for Z?" -> af registry providers
  • "What connection types does provider X expose?" -> af registry connections <provider>
  • "Writing a DAG with a specific operator" -> use registry to verify current signature before copying examples

Common Workflows

Validate DAGs Before Deploying

If you're using the Astro CLI, you can validate DAGs without a running Airflow instance:

# Parse DAGs to catch import errors and syntax issues
astro dev parse

# Run unit tests
astro dev pytest

Otherwise, validate against a running instance:

af dags errors     # Check for parse/import errors
af dags warnings   # Check for deprecation warnings

Discover Operator Signatures Before Writing Code

The Airflow Registry at airflow.apache.org/registry is the authoritative source for provider classes and their current constructor signatures. Prefer it over memory or stale documentation when authoring DAGs — the registry reflects the live provider release.

# List all providers and pick the one you need
af registry providers | jq '.providers[] | {id, name, version}'

# List every operator / hook / sensor in a provider (e.g. standard, amazon, google)
af registry modules standard \
  | jq '.modules[] | {name, type, import_path, docs_url}'

# Get the current constructor signature for a specific class
af registry parameters standard \
  | jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'

# Filter modules by substring (useful when you know the concept but not the class)
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("hitl"))'

Results are cached locally: 1 hour for the latest version, 30 days for pinned versions (which are immutable). Add --version X.Y.Z to any modules / parameters / connections call to target a specific release.

Investigate a Failed Run

# 1. List recent runs to find failure
af runs list --dag-id my_dag

# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data

# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00

Morning Health Check

# 1. Overall system health
af health

# 2. Check for broken DAGs
af dags errors

# 3. Check pool utilization
af config pools

Understand a DAG

# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag

Check Why DAG Isn't Running

# Check if paused
af dags get my_dag

# Check for import errors
af dags errors

# Check recent runs
af runs list --dag-id my_dag

Trigger and Monitor

# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800

# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>

Output Format

All commands output JSON (except instance commands which use human-readable tables):

af dags list
# {
#   "total_dags": 5,
#   "returned_count": 5,
#   "dags": [...]
# }

Use jq for filtering:

# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'

# Get DAG IDs only
af dags list | jq '.dags[].dag_id'

# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'

Task Logs Options

# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2

# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5

Direct API Access with af api

Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).

# Discover available endpoints
af api ls
af api ls --filter variable

# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE

Field syntax: -F key=value auto-converts types, -f key=value keeps as string.

Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.

Related Skills

SkillUse when...
authoring-dagsCreating or editing DAG files with best practices
testing-dagsIterative test -> debug -> fix -> retest cycles
debugging-dagsDeep root cause analysis and failure diagnosis
checking-freshnessChecking if data is up to date or stale
tracing-upstream-lineageFinding where data comes from
tracing-downstream-lineageImpact analysis -- what breaks if something changes
deploying-airflowDeploying DAGs to production (Astro, Docker Compose, Kubernetes)
migrating-airflow-2-to-3Upgrading DAGs from Airflow 2.x to 3.x
managing-astro-local-envStarting, stopping, or troubleshooting local Airflow
setting-up-astro-projectInitializing a new Astro/Airflow project

astronomerのその他のスキル

airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official
cosmos-dbt-core
astronomer
Astronomer Cosmosを使用して、dbt CoreプロジェクトをAirflow DAGまたはTaskGroupに変換します。3つのアセンブリパターン(スタンドアロンのDbtDag、既存のDAG内のDbtTaskGroup、細かい制御が可能な個別のCosmosオペレーター)をサポートします。分離とパフォーマンスのニーズに基づいて、8つの実行モード(WATCHER、LOCAL、VIRTUALENV、KUBERNETES、AIRFLOW_ASYNCなど)から選択できます。速度とセレクターの複雑さのバランスを取るために、3つの解析戦略(dbt_manifest、dbt_ls、dbt_ls_file、automatic)を提供します...
official