testing-dags

Các chu trình kiểm tra-gỡ lỗi-sửa lỗi lặp đi lặp lại cho Airflow DAG với chẩn đoán lỗi toàn diện. Bắt đầu bằng af runs trigger-wait <dag_id> để chạy một DAG và chờ hoàn tất; không cần kiểm tra trước khi chạy. Khi gặp lỗi, sử dụng af runs diagnose để có bản tóm tắt lỗi toàn diện và af tasks logs để kiểm tra chi tiết lỗi từ các tác vụ cụ thể. Hỗ trợ cấu hình tùy chỉnh, thời gian chờ và số lần thử lại; xử lý các tình huống thành công, thất bại và hết thời gian chờ với diễn giải phản hồi rõ ràng. Có sẵn tính năng xác th

npx skills add https://github.com/astronomer/agents --skill testing-dags

DAG Testing Skill

Use af commands to test, debug, and fix DAGs in iterative cycles.

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Quick Validation with Astro CLI

If the user has the Astro CLI available, these commands provide fast feedback without needing a running Airflow instance:

# Parse DAGs to catch import errors, syntax issues, and DAG-level problems
astro dev parse

# Run pytest against DAGs (runs tests in tests/ directory)
astro dev pytest

Use these for quick validation during development. For full end-to-end testing against a live Airflow instance, continue to the trigger-and-wait workflow below.


FIRST ACTION: Just Trigger the DAG

When the user asks to test a DAG, your FIRST AND ONLY action should be:

af runs trigger-wait <dag_id>

DO NOT:

  • Call af dags list first
  • Call af dags get first
  • Call af dags errors first
  • Use grep or ls or any other bash command
  • Do any "pre-flight checks"

Just trigger the DAG. If it fails, THEN debug.


Testing Workflow Overview

┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘

Philosophy: Try first, debug on failure. Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.


Phase 1: Trigger and Wait

Use af runs trigger-wait to test the DAG:

Primary Method: Trigger and Wait

af runs trigger-wait <dag_id> --timeout 300

Example:

af runs trigger-wait my_dag --timeout 300

Why this is the preferred method:

  • Single command handles trigger + monitoring
  • Returns immediately when DAG completes (success or failure)
  • Includes failed task details if run fails
  • No manual polling required

Response Interpretation

Success:

{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}

Failure:

{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}

Timeout:

{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}

Alternative: Trigger and Monitor Separately

Use this only when you need more control:

# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}

# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state

Handling Results

If Success

The DAG ran successfully. Summarize for the user:

  • Total elapsed time
  • Number of tasks completed
  • Any notable outputs (if visible in logs)

You're done!

If Timed Out

The DAG is still running. Options:

  1. Check current status: af runs get <dag_id> <dag_run_id>
  2. Ask user if they want to continue waiting
  3. Increase timeout and try again

If Failed

Move to Phase 2 (Debug) to identify the root cause.


Phase 2: Debug Failures (Only If Needed)

When a DAG run fails, use these commands to diagnose:

Get Comprehensive Diagnosis

af runs diagnose <dag_id> <dag_run_id>

Returns in one call:

  • Run metadata (state, timing)
  • All task instances with states
  • Summary of failed tasks
  • State counts (success, failed, skipped, etc.)

Get Task Logs

af tasks logs <dag_id> <dag_run_id> <task_id>

Example:

af tasks logs my_dag manual__2025-01-14T... extract_data

For specific retry attempt:

af tasks logs my_dag manual__2025-01-14T... extract_data --try 2

Look for:

  • Exception messages and stack traces
  • Connection errors (database, API, S3)
  • Permission errors
  • Timeout errors
  • Missing dependencies

Check Upstream Tasks

If a task shows upstream_failed, the root cause is in an upstream task. Use af runs diagnose to find which task actually failed.

Check Import Errors (If DAG Didn't Run)

If the trigger failed because the DAG doesn't exist:

af dags errors

This reveals syntax errors or missing dependencies that prevented the DAG from loading.


Phase 3: Fix and Retest

Once you identify the issue:

Common Fixes

IssueFix
Missing importAdd to DAG file
Missing packageAdd to requirements.txt
Connection errorCheck af config connections, verify credentials
Variable missingCheck af config variables, create if needed
TimeoutIncrease task timeout or optimize query
Permission errorCheck credentials in connection

After Fixing

  1. Save the file
  2. Retest: af runs trigger-wait <dag_id>

Repeat the test → debug → fix loop until the DAG succeeds.


CLI Quick Reference

PhaseCommandPurpose
Testaf runs trigger-wait <dag_id>Primary test method — start here
Testaf runs trigger <dag_id>Start run (alternative)
Testaf runs get <dag_id> <run_id>Check run status
Debugaf runs diagnose <dag_id> <run_id>Comprehensive failure diagnosis
Debugaf tasks logs <dag_id> <run_id> <task_id>Get task output/errors
Debugaf dags errorsCheck for parse errors (if DAG won't load)
Debugaf dags get <dag_id>Verify DAG config
Debugaf dags explore <dag_id>Full DAG inspection
Configaf config connectionsList connections
Configaf config variablesList variables

Testing Scenarios

Scenario 1: Test a DAG (Happy Path)

af runs trigger-wait my_dag
# Success! Done.

Scenario 2: Test a DAG (With Failure)

# 1. Run and wait
af runs trigger-wait my_dag
# Failed...

# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...

# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data

# 4. [Fix the issue in DAG code]

# 5. Retest
af runs trigger-wait my_dag

Scenario 3: DAG Doesn't Exist / Won't Load

# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found

# 2. Find parse error
af dags errors

# 3. [Fix the issue in DAG code]

# 4. Retest
af runs trigger-wait my_dag

Scenario 4: Debug a Failed Scheduled Run

# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...

# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

# 3. [Fix the issue]

# 4. Retest
af runs trigger-wait my_dag

Scenario 5: Test with Custom Configuration

af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600

Scenario 6: Long-Running DAG

# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600

# If timed out, check current state
af runs get my_dag manual__2025-01-14T...

Debugging Tips

Common Error Patterns

Connection Refused / Timeout:

  • Check af config connections for correct host/port
  • Verify network connectivity to external system
  • Check if connection credentials are correct

ModuleNotFoundError:

  • Package missing from requirements.txt
  • After adding, may need environment restart

PermissionError:

  • Check IAM roles, database grants, API keys
  • Verify connection has correct credentials

Task Timeout:

  • Query or operation taking too long
  • Consider adding timeout parameter to task
  • Optimize underlying query/operation

Reading Task Logs

Task logs typically show:

  1. Task start timestamp
  2. Any print/log statements from task code
  3. Return value (for @task decorated functions)
  4. Exception + full stack trace (if failed)
  5. Task end timestamp and duration

Focus on the exception at the bottom of failed task logs.

On Astro

Astro deployments support environment promotion, which helps structure your testing workflow:

  • Dev deployment: Test DAGs freely with astro deploy --dags for fast iteration
  • Staging deployment: Run integration tests against production-like data
  • Production deployment: Deploy only after validation in lower environments
  • Use separate Astro deployments for each environment and promote code through them

Related Skills

  • authoring-dags: For creating new DAGs (includes validation before testing)
  • debugging-dags: For general Airflow troubleshooting
  • deploying-airflow: For deploying DAGs to production after testing

Thêm skills từ astronomer

airflow
astronomer
Truy vấn, quản lý và khắc phục sự cố DAG, lần chạy, tác vụ và cấu hình hệ thống Apache Airflow. Hỗ trợ hơn 30 lệnh bao gồm kiểm tra DAG, quản lý lần chạy, ghi nhật ký tác vụ, truy vấn cấu hình và truy cập trực tiếp REST API. Quản lý nhiều phiên bản Airflow với cấu hình liên tục; tự động phát hiện triển khai cục bộ và Astro. Kích hoạt chạy DAG đồng bộ (chờ hoàn thành) hoặc không đồng bộ, chẩn đoán lỗi, xóa lần chạy để thử lại, và truy cập nhật ký tác vụ với bộ lọc thử lại/ch
official
airflow-hitl
astronomer
Cổng phê duyệt của con người, đầu vào biểu mẫu và phân nhánh trong DAG Airflow sử dụng các toán tử có thể trì hoãn. Bốn loại toán tử: ApprovalOperator cho quyết định phê duyệt/từ chối, HITLOperator cho lựa chọn nhiều tùy chọn với biểu mẫu, HITLBranchOperator cho định tuyến tác vụ do con người điều khiển và HITLEntryOperator cho thu thập dữ liệu biểu mẫu. Tất cả các toán tử đều có thể trì hoãn, giải phóng slot worker trong khi chờ phản hồi của con người qua tab Required Actions của giao diện Airflow hoặc REST API. Hỗ trợ các tính năng tùy chọn bao gồm tùy chỉnh...
official
airflow-plugins
astronomer
Xây dựng plugin Airflow 3.1+ nhúng ứng dụng FastAPI, trang UI tùy chỉnh, thành phần React, middleware, macro và liên kết toán tử trực tiếp vào giao diện Airflow. Sử dụng…
official
analyzing-data
astronomer
Truy vấn kho dữ liệu của bạn để trả lời các câu hỏi kinh doanh với các mẫu đã lưu trong bộ nhớ đệm và ánh xạ khái niệm. Hỗ trợ tra cứu mẫu và lưu vào bộ nhớ đệm cho các loại câu hỏi lặp lại, với ghi nhận kết quả để cải thiện các truy vấn trong tương lai. Bao gồm bộ nhớ đệm ánh xạ khái niệm sang bảng và khám phá lược đồ bảng qua INFORMATION_SCHEMA hoặc tìm kiếm trong mã nguồn. Cung cấp các hàm kernel run_sql() và run_sql_pandas() trả về DataFrame Polars hoặc Pandas để phân tích. Các lệnh CLI để quản lý bộ nhớ đệm khái
official
annotating-task-lineage
astronomer
Chú thích các tác vụ Airflow với dòng dữ liệu (data lineage) bằng cách sử dụng inlets và outlets. Hỗ trợ các đối tượng Dataset của OpenLineage, Assets của Airflow và Datasets của Airflow để xác định đầu vào và đầu ra trên các cơ sở dữ liệu, kho dữ liệu và lưu trữ đám mây. Sử dụng như phương án dự phòng khi các toán tử thiếu bộ trích xuất OpenLineage tích hợp sẵn; tuân theo hệ thống ưu tiên bốn cấp, trong đó các bộ trích xuất tùy chỉnh và phương thức OpenLineage được ưu tiên. Bao gồm các trình trợ giúp đặt tên dataset cho Snowflake, BigQuery, S3 và PostgreSQL để đảm bảo tính nhất quán...
official
authoring-dags
astronomer
Quy trình làm việc có hướng dẫn để tạo DAG Apache Airflow với tích hợp xác thực và kiểm thử. Phương pháp sáu giai đoạn có cấu trúc: khám phá môi trường và các mẫu hiện có, lập kế hoạch cấu trúc DAG, triển khai theo các phương pháp tốt nhất, xác thực bằng lệnh CLI af, kiểm thử với sự đồng ý của người dùng, và lặp lại các bước sửa lỗi. Các lệnh CLI để khám phá (af config connections, af config providers, af dags list) và xác thực (af dags errors, af dags get, af dags explore) cung cấp phản hồi tức thì về DAG...
official
blueprint
astronomer
Xác định các mẫu nhóm tác vụ Airflow có thể tái sử dụng với xác thực Pydantic và soạn DAG từ YAML. Sử dụng khi tạo mẫu blueprint, soạn DAG từ…
official
checking-freshness
astronomer
Kiểm tra độ tươi mới của dữ liệu bằng cách đối chiếu dấu thời gian bảng và mẫu cập nhật với thang đo độ cũ. Xác định các cột dấu thời gian sử dụng các mẫu đặt tên ETL phổ biến (_loaded_at, _updated_at, created_at, v.v.) và truy vấn giá trị tối đa của chúng để xác định tuổi. Phân loại dữ liệu thành bốn trạng thái độ tươi mới: Tươi (< 4 giờ), Cũ (4–24 giờ), Rất cũ (> 24 giờ) hoặc Không xác định (không tìm thấy dấu thời gian). Cung cấp các mẫu SQL để kiểm tra thời gian cập nhật cuối cùng và xu hướng số lượng hàng trong những ngày gần đây để...
official