testing-dags

作者: astronomer

針對Airflow DAG的反覆測試-除錯-修復循環,提供全面的失敗診斷。從af runs trigger-wait <dag_id>開始執行DAG並等待完成,無需預先檢查。失敗時,使用af runs diagnose獲取完整的失敗摘要,並透過af tasks logs檢查特定任務的錯誤細節。支援自訂配置、超時設定與重試機制;能處理成功、失敗及超時情境,並提供清晰的回應解讀。快速驗證功能亦已就緒...

npx skills add https://github.com/astronomer/agents --skill testing-dags

DAG Testing Skill

Use af commands to test, debug, and fix DAGs in iterative cycles.

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Quick Validation with Astro CLI

If the user has the Astro CLI available, these commands provide fast feedback without needing a running Airflow instance:

# Parse DAGs to catch import errors, syntax issues, and DAG-level problems
astro dev parse

# Run pytest against DAGs (runs tests in tests/ directory)
astro dev pytest

Use these for quick validation during development. For full end-to-end testing against a live Airflow instance, continue to the trigger-and-wait workflow below.


FIRST ACTION: Just Trigger the DAG

When the user asks to test a DAG, your FIRST AND ONLY action should be:

af runs trigger-wait <dag_id>

DO NOT:

  • Call af dags list first
  • Call af dags get first
  • Call af dags errors first
  • Use grep or ls or any other bash command
  • Do any "pre-flight checks"

Just trigger the DAG. If it fails, THEN debug.


Testing Workflow Overview

┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘

Philosophy: Try first, debug on failure. Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.


Phase 1: Trigger and Wait

Use af runs trigger-wait to test the DAG:

Primary Method: Trigger and Wait

af runs trigger-wait <dag_id> --timeout 300

Example:

af runs trigger-wait my_dag --timeout 300

Why this is the preferred method:

  • Single command handles trigger + monitoring
  • Returns immediately when DAG completes (success or failure)
  • Includes failed task details if run fails
  • No manual polling required

Response Interpretation

Success:

{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}

Failure:

{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}

Timeout:

{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}

Alternative: Trigger and Monitor Separately

Use this only when you need more control:

# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}

# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state

Handling Results

If Success

The DAG ran successfully. Summarize for the user:

  • Total elapsed time
  • Number of tasks completed
  • Any notable outputs (if visible in logs)

You're done!

If Timed Out

The DAG is still running. Options:

  1. Check current status: af runs get <dag_id> <dag_run_id>
  2. Ask user if they want to continue waiting
  3. Increase timeout and try again

If Failed

Move to Phase 2 (Debug) to identify the root cause.


Phase 2: Debug Failures (Only If Needed)

When a DAG run fails, use these commands to diagnose:

Get Comprehensive Diagnosis

af runs diagnose <dag_id> <dag_run_id>

Returns in one call:

  • Run metadata (state, timing)
  • All task instances with states
  • Summary of failed tasks
  • State counts (success, failed, skipped, etc.)

Get Task Logs

af tasks logs <dag_id> <dag_run_id> <task_id>

Example:

af tasks logs my_dag manual__2025-01-14T... extract_data

For specific retry attempt:

af tasks logs my_dag manual__2025-01-14T... extract_data --try 2

Look for:

  • Exception messages and stack traces
  • Connection errors (database, API, S3)
  • Permission errors
  • Timeout errors
  • Missing dependencies

Check Upstream Tasks

If a task shows upstream_failed, the root cause is in an upstream task. Use af runs diagnose to find which task actually failed.

Check Import Errors (If DAG Didn't Run)

If the trigger failed because the DAG doesn't exist:

af dags errors

This reveals syntax errors or missing dependencies that prevented the DAG from loading.


Phase 3: Fix and Retest

Once you identify the issue:

Common Fixes

IssueFix
Missing importAdd to DAG file
Missing packageAdd to requirements.txt
Connection errorCheck af config connections, verify credentials
Variable missingCheck af config variables, create if needed
TimeoutIncrease task timeout or optimize query
Permission errorCheck credentials in connection

After Fixing

  1. Save the file
  2. Retest: af runs trigger-wait <dag_id>

Repeat the test → debug → fix loop until the DAG succeeds.


CLI Quick Reference

PhaseCommandPurpose
Testaf runs trigger-wait <dag_id>Primary test method — start here
Testaf runs trigger <dag_id>Start run (alternative)
Testaf runs get <dag_id> <run_id>Check run status
Debugaf runs diagnose <dag_id> <run_id>Comprehensive failure diagnosis
Debugaf tasks logs <dag_id> <run_id> <task_id>Get task output/errors
Debugaf dags errorsCheck for parse errors (if DAG won't load)
Debugaf dags get <dag_id>Verify DAG config
Debugaf dags explore <dag_id>Full DAG inspection
Configaf config connectionsList connections
Configaf config variablesList variables

Testing Scenarios

Scenario 1: Test a DAG (Happy Path)

af runs trigger-wait my_dag
# Success! Done.

Scenario 2: Test a DAG (With Failure)

# 1. Run and wait
af runs trigger-wait my_dag
# Failed...

# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...

# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data

# 4. [Fix the issue in DAG code]

# 5. Retest
af runs trigger-wait my_dag

Scenario 3: DAG Doesn't Exist / Won't Load

# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found

# 2. Find parse error
af dags errors

# 3. [Fix the issue in DAG code]

# 4. Retest
af runs trigger-wait my_dag

Scenario 4: Debug a Failed Scheduled Run

# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...

# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

# 3. [Fix the issue]

# 4. Retest
af runs trigger-wait my_dag

Scenario 5: Test with Custom Configuration

af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600

Scenario 6: Long-Running DAG

# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600

# If timed out, check current state
af runs get my_dag manual__2025-01-14T...

Debugging Tips

Common Error Patterns

Connection Refused / Timeout:

  • Check af config connections for correct host/port
  • Verify network connectivity to external system
  • Check if connection credentials are correct

ModuleNotFoundError:

  • Package missing from requirements.txt
  • After adding, may need environment restart

PermissionError:

  • Check IAM roles, database grants, API keys
  • Verify connection has correct credentials

Task Timeout:

  • Query or operation taking too long
  • Consider adding timeout parameter to task
  • Optimize underlying query/operation

Reading Task Logs

Task logs typically show:

  1. Task start timestamp
  2. Any print/log statements from task code
  3. Return value (for @task decorated functions)
  4. Exception + full stack trace (if failed)
  5. Task end timestamp and duration

Focus on the exception at the bottom of failed task logs.

On Astro

Astro deployments support environment promotion, which helps structure your testing workflow:

  • Dev deployment: Test DAGs freely with astro deploy --dags for fast iteration
  • Staging deployment: Run integration tests against production-like data
  • Production deployment: Deploy only after validation in lower environments
  • Use separate Astro deployments for each environment and promote code through them

Related Skills

  • authoring-dags: For creating new DAGs (includes validation before testing)
  • debugging-dags: For general Airflow troubleshooting
  • deploying-airflow: For deploying DAGs to production after testing

來自 astronomer 的更多技能

airflow
astronomer
查詢、管理及疑難排解 Apache Airflow 的 DAG、執行、任務與系統設定。支援 30 多種指令,涵蓋 DAG 檢查、執行管理、任務日誌、設定查詢及直接 REST API 存取。可管理多個 Airflow 實例並保留設定;自動探索本機與 Astro 部署。同步(等待完成)或非同步觸發 DAG 執行、診斷失敗、清除執行以重試,並透過重試/映射索引篩選存取任務日誌。輸出...
official
airflow-hitl
astronomer
使用可延遲運算子,在 Airflow DAG 中實現人工審批關卡、表單輸入與分支流程。包含四種運算子類型:ApprovalOperator 用於核准/拒絕決策、HITLOperator 用於多選項表單選擇、HITLBranchOperator 用於人工驅動的任務路由,以及 HITLEntryOperator 用於表單資料收集。所有運算子皆為可延遲,在等待人工回應時釋放工作槽位,可透過 Airflow UI 的「必要操作」標籤或 REST API 進行回應。支援選用功能,包括自訂...
official
airflow-plugins
astronomer
構建 Airflow 3.1+ 插件,將 FastAPI 應用、自訂 UI 頁面、React 元件、中介軟體、巨集和運算子連結直接嵌入 Airflow UI。使用…
official
analyzing-data
astronomer
查詢您的資料倉儲,利用快取的模式與概念映射來回答商業問題。支援針對重複問題類型的模式查詢與快取,並記錄結果以改善未來查詢。包含概念到表格的映射快取,以及透過INFORMATION_SCHEMA或程式碼庫grep進行的表格結構探索。提供run_sql()與run_sql_pandas()核心函式,回傳Polars或Pandas DataFrame供分析使用。CLI指令可管理概念、模式與表格快取,以及...
official
annotating-task-lineage
astronomer
使用 inlets 和 outlets 為 Airflow 任務標註資料血緣。支援 OpenLineage Dataset 物件、Airflow Assets 與 Airflow Datasets,用於定義跨資料庫、資料倉儲及雲端儲存的輸入與輸出。當運算子缺乏內建 OpenLineage 提取器時,可作為備用方案;遵循四層優先級系統,其中自訂提取器與 OpenLineage 方法具有優先權。包含針對 Snowflake、BigQuery、S3 及 PostgreSQL 的資料集命名輔助工具,以確保一致性...
official
authoring-dags
astronomer
建立Apache Airflow DAG的引導式工作流程,包含驗證與測試整合。結構化六階段方法:探索環境與現有模式、規劃DAG結構、遵循最佳實踐進行實作、使用af CLI指令驗證、經使用者同意後測試,以及根據修正反覆迭代。用於探索的CLI指令(af config connections、af config providers、af dags list)與驗證指令(af dags errors、af dags get、af dags explore)可提供DAG的即時回饋。
official
blueprint
astronomer
使用 Pydantic 驗證定義可重複使用的 Airflow 任務組模板,並從 YAML 組合 DAG。適用於建立 blueprint 模板、從 YAML 組合 DAG 等場景。
official
checking-freshness
astronomer
透過檢查表格時間戳記及更新模式,並比對過時程度量表,驗證資料的新鮮度。利用常見的ETL命名模式(如 _loaded_at、_updated_at、created_at 等)識別時間戳記欄位,並查詢其最大值以判斷資料年齡。將資料分類為四種新鮮度狀態:新鮮(少於4小時)、過時(4–24小時)、非常過時(超過24小時)或未知(未找到時間戳記)。提供SQL範本,用於檢查最近幾天的上次更新時間與資料列數量趨勢。
official