authoring-dags

作者: astronomer

建立Apache Airflow DAG的引導式工作流程,包含驗證與測試整合。結構化六階段方法:探索環境與現有模式、規劃DAG結構、遵循最佳實踐進行實作、使用af CLI指令驗證、經使用者同意後測試,以及根據修正反覆迭代。用於探索的CLI指令(af config connections、af config providers、af dags list)與驗證指令(af dags errors、af dags get、af dags explore)可提供DAG的即時回饋。

npx skills add https://github.com/astronomer/agents --skill authoring-dags

DAG Authoring Skill

This skill guides you through creating and validating Airflow DAGs using best practices and af CLI commands.

For testing and debugging DAGs, see the testing-dags skill which covers the full test -> debug -> fix -> retest workflow.


Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Workflow Overview

+-----------------------------------------+
| 1. DISCOVER                             |
|    Understand codebase & environment    |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 2. PLAN                                 |
|    Propose structure, get approval      |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 3. IMPLEMENT                            |
|    Write DAG following patterns         |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 4. VALIDATE                             |
|    Check import errors, warnings        |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 5. TEST (with user consent)             |
|    Trigger, monitor, check logs         |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 6. ITERATE                              |
|    Fix issues, re-validate              |
+-----------------------------------------+

Phase 1: Discover

Before writing code, understand the context.

Explore the Codebase

Use file tools to find existing patterns:

  • Glob for **/dags/**/*.py to find existing DAGs
  • Read similar DAGs to understand conventions
  • Check requirements.txt for available packages

Query the Airflow Environment

Use af CLI commands to understand what's available:

CommandPurpose
af config connectionsWhat external systems are configured
af config variablesWhat configuration values exist
af config providersWhat operator packages are installed
af config versionVersion constraints and features
af dags listExisting DAGs and naming conventions
af config poolsResource pools for concurrency

Example discovery questions:

  • "Is there a Snowflake connection?" -> af config connections
  • "What Airflow version?" -> af config version
  • "Are S3 operators available?" -> af config providers

Phase 2: Plan

Based on discovery, propose:

  1. DAG structure - Tasks, dependencies, schedule
  2. Operators to use - Based on available providers
  3. Connections needed - Existing or to be created
  4. Variables needed - Existing or to be created
  5. Packages needed - Additions to requirements.txt

Get user approval before implementing.


Phase 3: Implement

Write the DAG following best practices (see below). Key steps:

  1. Create DAG file in appropriate location
  2. Update requirements.txt if needed
  3. Save the file

Phase 4: Validate

Use af CLI as a feedback loop to validate your DAG.

Step 1: Check Import Errors

After saving, check for parse errors (Airflow will have already parsed the file):

af dags errors
  • If your file appears -> fix and retry
  • If no errors -> continue

Common causes: missing imports, syntax errors, missing packages.

Step 2: Verify DAG Exists

af dags get <dag_id>

Check: DAG exists, schedule correct, tags set, paused status.

Step 3: Check Warnings

af dags warnings

Look for deprecation warnings or configuration issues.

Step 4: Explore DAG Structure

af dags explore <dag_id>

Returns in one call: metadata, tasks, dependencies, source code.

On Astro

If you're running on Astro, you can also validate locally before deploying:

  • Parse check: Run astro dev parse to catch import errors and DAG-level issues without starting a full Airflow environment
  • DAG-only deploy: Once validated, use astro deploy --dags for fast DAG-only deploys that skip the Docker image build — ideal for iterating on DAG code

Phase 5: Test

See the testing-dags skill for comprehensive testing guidance.

Once validation passes, test the DAG using the workflow in the testing-dags skill:

  1. Get user consent -- Always ask before triggering
  2. Trigger and wait -- af runs trigger-wait <dag_id> --timeout 300
  3. Analyze results -- Check success/failure status
  4. Debug if needed -- af runs diagnose <dag_id> <run_id> and af tasks logs <dag_id> <run_id> <task_id>

Quick Test (Minimal)

# Ask user first, then:
af runs trigger-wait <dag_id> --timeout 300

For the full test -> debug -> fix -> retest loop, see testing-dags.


Phase 6: Iterate

If issues found:

  1. Fix the code
  2. Check for import errors: af dags errors
  3. Re-validate (Phase 4)
  4. Re-test using the testing-dags skill workflow (Phase 5)

CLI Quick Reference

PhaseCommandPurpose
Discoveraf config connectionsAvailable connections
Discoveraf config variablesConfiguration values
Discoveraf config providersInstalled operators
Discoveraf config versionVersion info
Validateaf dags errorsParse errors (check first!)
Validateaf dags get <dag_id>Verify DAG config
Validateaf dags warningsConfiguration warnings
Validateaf dags explore <dag_id>Full DAG inspection

Testing commands -- See the testing-dags skill for af runs trigger-wait, af runs diagnose, af tasks logs, etc.


Best Practices & Anti-Patterns

For code patterns and anti-patterns, see reference/best-practices.md.

Read this reference when writing new DAGs or reviewing existing ones. It covers what patterns are correct (including Airflow 3-specific behavior) and what to avoid.


Related Skills

  • testing-dags: For testing DAGs, debugging failures, and the test -> fix -> retest loop
  • debugging-dags: For troubleshooting failed DAGs
  • deploying-airflow: For deploying DAGs to production (Astro or open-source)
  • migrating-airflow-2-to-3: For migrating DAGs to Airflow 3

來自 astronomer 的更多技能

airflow
astronomer
查詢、管理及疑難排解 Apache Airflow 的 DAG、執行、任務與系統設定。支援 30 多種指令,涵蓋 DAG 檢查、執行管理、任務日誌、設定查詢及直接 REST API 存取。可管理多個 Airflow 實例並保留設定;自動探索本機與 Astro 部署。同步(等待完成)或非同步觸發 DAG 執行、診斷失敗、清除執行以重試,並透過重試/映射索引篩選存取任務日誌。輸出...
official
airflow-hitl
astronomer
使用可延遲運算子,在 Airflow DAG 中實現人工審批關卡、表單輸入與分支流程。包含四種運算子類型:ApprovalOperator 用於核准/拒絕決策、HITLOperator 用於多選項表單選擇、HITLBranchOperator 用於人工驅動的任務路由,以及 HITLEntryOperator 用於表單資料收集。所有運算子皆為可延遲,在等待人工回應時釋放工作槽位,可透過 Airflow UI 的「必要操作」標籤或 REST API 進行回應。支援選用功能,包括自訂...
official
airflow-plugins
astronomer
構建 Airflow 3.1+ 插件,將 FastAPI 應用、自訂 UI 頁面、React 元件、中介軟體、巨集和運算子連結直接嵌入 Airflow UI。使用…
official
analyzing-data
astronomer
查詢您的資料倉儲,利用快取的模式與概念映射來回答商業問題。支援針對重複問題類型的模式查詢與快取,並記錄結果以改善未來查詢。包含概念到表格的映射快取,以及透過INFORMATION_SCHEMA或程式碼庫grep進行的表格結構探索。提供run_sql()與run_sql_pandas()核心函式,回傳Polars或Pandas DataFrame供分析使用。CLI指令可管理概念、模式與表格快取,以及...
official
annotating-task-lineage
astronomer
使用 inlets 和 outlets 為 Airflow 任務標註資料血緣。支援 OpenLineage Dataset 物件、Airflow Assets 與 Airflow Datasets,用於定義跨資料庫、資料倉儲及雲端儲存的輸入與輸出。當運算子缺乏內建 OpenLineage 提取器時,可作為備用方案;遵循四層優先級系統,其中自訂提取器與 OpenLineage 方法具有優先權。包含針對 Snowflake、BigQuery、S3 及 PostgreSQL 的資料集命名輔助工具,以確保一致性...
official
blueprint
astronomer
使用 Pydantic 驗證定義可重複使用的 Airflow 任務組模板,並從 YAML 組合 DAG。適用於建立 blueprint 模板、從 YAML 組合 DAG 等場景。
official
checking-freshness
astronomer
透過檢查表格時間戳記及更新模式,並比對過時程度量表,驗證資料的新鮮度。利用常見的ETL命名模式(如 _loaded_at、_updated_at、created_at 等)識別時間戳記欄位,並查詢其最大值以判斷資料年齡。將資料分類為四種新鮮度狀態:新鮮(少於4小時)、過時(4–24小時)、非常過時(超過24小時)或未知(未找到時間戳記)。提供SQL範本,用於檢查最近幾天的上次更新時間與資料列數量趨勢。
official
cosmos-dbt-core
astronomer
使用 Astronomer Cosmos 將 dbt Core 專案轉換為 Airflow DAG 或 TaskGroup。支援三種組裝模式:獨立的 DbtDag、現有 DAG 中的 DbtTaskGroup,以及用於精細控制的獨立 Cosmos 運算子。根據隔離與效能需求,可從八種執行模式(WATCHER、LOCAL、VIRTUALENV、KUBERNETES、AIRFLOW_ASYNC 等)中選擇。提供三種解析策略(dbt_manifest、dbt_ls、dbt_ls_file、自動),以平衡速度與選擇器複雜度...
official