testing-dags

作者: astronomer

针对Airflow DAG的迭代式测试-调试-修复循环,提供全面的故障诊断。首先使用af runs trigger-wait <dag_id>运行DAG并等待完成,无需预检。失败时,使用af runs diagnose获取全面的故障摘要,并通过af tasks logs查看特定任务的错误详情。支持自定义配置、超时和重试次数;处理成功、失败和超时场景,并给出清晰的响应解读。提供快速验证功能...

npx skills add https://github.com/astronomer/agents --skill testing-dags

DAG Testing Skill

Use af commands to test, debug, and fix DAGs in iterative cycles.

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Quick Validation with Astro CLI

If the user has the Astro CLI available, these commands provide fast feedback without needing a running Airflow instance:

# Parse DAGs to catch import errors, syntax issues, and DAG-level problems
astro dev parse

# Run pytest against DAGs (runs tests in tests/ directory)
astro dev pytest

Use these for quick validation during development. For full end-to-end testing against a live Airflow instance, continue to the trigger-and-wait workflow below.


FIRST ACTION: Just Trigger the DAG

When the user asks to test a DAG, your FIRST AND ONLY action should be:

af runs trigger-wait <dag_id>

DO NOT:

  • Call af dags list first
  • Call af dags get first
  • Call af dags errors first
  • Use grep or ls or any other bash command
  • Do any "pre-flight checks"

Just trigger the DAG. If it fails, THEN debug.


Testing Workflow Overview

┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘

Philosophy: Try first, debug on failure. Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.


Phase 1: Trigger and Wait

Use af runs trigger-wait to test the DAG:

Primary Method: Trigger and Wait

af runs trigger-wait <dag_id> --timeout 300

Example:

af runs trigger-wait my_dag --timeout 300

Why this is the preferred method:

  • Single command handles trigger + monitoring
  • Returns immediately when DAG completes (success or failure)
  • Includes failed task details if run fails
  • No manual polling required

Response Interpretation

Success:

{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}

Failure:

{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}

Timeout:

{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}

Alternative: Trigger and Monitor Separately

Use this only when you need more control:

# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}

# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state

Handling Results

If Success

The DAG ran successfully. Summarize for the user:

  • Total elapsed time
  • Number of tasks completed
  • Any notable outputs (if visible in logs)

You're done!

If Timed Out

The DAG is still running. Options:

  1. Check current status: af runs get <dag_id> <dag_run_id>
  2. Ask user if they want to continue waiting
  3. Increase timeout and try again

If Failed

Move to Phase 2 (Debug) to identify the root cause.


Phase 2: Debug Failures (Only If Needed)

When a DAG run fails, use these commands to diagnose:

Get Comprehensive Diagnosis

af runs diagnose <dag_id> <dag_run_id>

Returns in one call:

  • Run metadata (state, timing)
  • All task instances with states
  • Summary of failed tasks
  • State counts (success, failed, skipped, etc.)

Get Task Logs

af tasks logs <dag_id> <dag_run_id> <task_id>

Example:

af tasks logs my_dag manual__2025-01-14T... extract_data

For specific retry attempt:

af tasks logs my_dag manual__2025-01-14T... extract_data --try 2

Look for:

  • Exception messages and stack traces
  • Connection errors (database, API, S3)
  • Permission errors
  • Timeout errors
  • Missing dependencies

Check Upstream Tasks

If a task shows upstream_failed, the root cause is in an upstream task. Use af runs diagnose to find which task actually failed.

Check Import Errors (If DAG Didn't Run)

If the trigger failed because the DAG doesn't exist:

af dags errors

This reveals syntax errors or missing dependencies that prevented the DAG from loading.


Phase 3: Fix and Retest

Once you identify the issue:

Common Fixes

IssueFix
Missing importAdd to DAG file
Missing packageAdd to requirements.txt
Connection errorCheck af config connections, verify credentials
Variable missingCheck af config variables, create if needed
TimeoutIncrease task timeout or optimize query
Permission errorCheck credentials in connection

After Fixing

  1. Save the file
  2. Retest: af runs trigger-wait <dag_id>

Repeat the test → debug → fix loop until the DAG succeeds.


CLI Quick Reference

PhaseCommandPurpose
Testaf runs trigger-wait <dag_id>Primary test method — start here
Testaf runs trigger <dag_id>Start run (alternative)
Testaf runs get <dag_id> <run_id>Check run status
Debugaf runs diagnose <dag_id> <run_id>Comprehensive failure diagnosis
Debugaf tasks logs <dag_id> <run_id> <task_id>Get task output/errors
Debugaf dags errorsCheck for parse errors (if DAG won't load)
Debugaf dags get <dag_id>Verify DAG config
Debugaf dags explore <dag_id>Full DAG inspection
Configaf config connectionsList connections
Configaf config variablesList variables

Testing Scenarios

Scenario 1: Test a DAG (Happy Path)

af runs trigger-wait my_dag
# Success! Done.

Scenario 2: Test a DAG (With Failure)

# 1. Run and wait
af runs trigger-wait my_dag
# Failed...

# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...

# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data

# 4. [Fix the issue in DAG code]

# 5. Retest
af runs trigger-wait my_dag

Scenario 3: DAG Doesn't Exist / Won't Load

# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found

# 2. Find parse error
af dags errors

# 3. [Fix the issue in DAG code]

# 4. Retest
af runs trigger-wait my_dag

Scenario 4: Debug a Failed Scheduled Run

# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...

# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

# 3. [Fix the issue]

# 4. Retest
af runs trigger-wait my_dag

Scenario 5: Test with Custom Configuration

af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600

Scenario 6: Long-Running DAG

# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600

# If timed out, check current state
af runs get my_dag manual__2025-01-14T...

Debugging Tips

Common Error Patterns

Connection Refused / Timeout:

  • Check af config connections for correct host/port
  • Verify network connectivity to external system
  • Check if connection credentials are correct

ModuleNotFoundError:

  • Package missing from requirements.txt
  • After adding, may need environment restart

PermissionError:

  • Check IAM roles, database grants, API keys
  • Verify connection has correct credentials

Task Timeout:

  • Query or operation taking too long
  • Consider adding timeout parameter to task
  • Optimize underlying query/operation

Reading Task Logs

Task logs typically show:

  1. Task start timestamp
  2. Any print/log statements from task code
  3. Return value (for @task decorated functions)
  4. Exception + full stack trace (if failed)
  5. Task end timestamp and duration

Focus on the exception at the bottom of failed task logs.

On Astro

Astro deployments support environment promotion, which helps structure your testing workflow:

  • Dev deployment: Test DAGs freely with astro deploy --dags for fast iteration
  • Staging deployment: Run integration tests against production-like data
  • Production deployment: Deploy only after validation in lower environments
  • Use separate Astro deployments for each environment and promote code through them

Related Skills

  • authoring-dags: For creating new DAGs (includes validation before testing)
  • debugging-dags: For general Airflow troubleshooting
  • deploying-airflow: For deploying DAGs to production after testing

来自 astronomer 的更多技能

airflow
astronomer
查询、管理和排查Apache Airflow的DAG、运行记录、任务及系统配置。支持30多种命令,涵盖DAG检查、运行管理、任务日志、配置查询及直接REST API访问。通过持久化配置管理多个Airflow实例;自动发现本地和Astro部署。同步(等待完成)或异步触发DAG运行,诊断故障,清除运行记录以重试,并通过重试/映射索引过滤访问任务日志。输出...
official
airflow-hitl
astronomer
在Airflow DAG中使用可延迟操作符实现人工审批关卡、表单输入和分支。四种操作符类型:用于批准/拒绝决策的ApprovalOperator、带表单的多选项选择HITLOperator、人工驱动的任务路由HITLBranchOperator,以及表单数据收集HITLEntryOperator。所有操作符均为可延迟设计,在通过Airflow UI的"必需操作"标签页或REST API等待人工响应时释放工作槽位。支持包括自定义在内的可选功能...
official
airflow-plugins
astronomer
构建嵌入FastAPI应用、自定义UI页面、React组件、中间件、宏和操作符链接的Airflow 3.1+插件,直接集成到Airflow UI中。使用…
official
analyzing-data
astronomer
查询数据仓库,利用缓存的模式和概念映射来回答业务问题。支持对重复问题类型进行模式查找和缓存,并通过记录结果来改进后续查询。包含概念到表的映射缓存,以及通过INFORMATION_SCHEMA或代码库grep进行表结构发现。提供run_sql()和run_sql_pandas()内核函数,返回Polars或Pandas DataFrame用于分析。提供CLI命令用于管理概念、模式和表缓存,以及...
official
annotating-task-lineage
astronomer
使用入口和出口为Airflow任务标注数据血缘。支持使用OpenLineage Dataset对象、Airflow Assets和Airflow Datasets定义跨数据库、数据仓库及云存储的输入输出。当运算符缺少内置OpenLineage提取器时作为备用方案;遵循四级优先级系统,其中自定义提取器和OpenLineage方法优先。包含针对Snowflake、BigQuery、S3和PostgreSQL的数据集命名辅助工具,以确保一致性...
official
authoring-dags
astronomer
创建Apache Airflow DAG的引导式工作流,集成验证与测试。采用六阶段结构化方法:发现环境与现有模式、规划DAG结构、遵循最佳实践实现、通过af CLI命令验证、经用户同意测试、迭代修复。用于发现(af config connections、af config providers、af dags list)和验证(af dags errors、af dags get、af dags explore)的CLI命令可提供DAG的即时反馈...
official
blueprint
astronomer
使用Pydantic验证定义可复用的Airflow任务组模板,并从YAML组合DAG。适用于创建blueprint模板、从YAML组合DAG等场景。
official
checking-freshness
astronomer
通过检查表时间戳和更新模式与陈旧度标尺对比,验证数据新鲜度。使用常见ETL命名模式(如_loaded_at、_updated_at、created_at等)识别时间戳列,并查询其最大值以确定数据时效。将数据分为四种新鲜度状态:新鲜(<4小时)、陈旧(4–24小时)、非常陈旧(>24小时)或未知(未找到时间戳)。提供SQL模板,用于检查最近几天的最后更新时间及行数变化趋势。
official