debugging-dags

โดย astronomer

การวิเคราะห์สาเหตุที่แท้จริงอย่างเป็นระบบและการแก้ไขสำหรับ Airflow DAGs ที่ล้มเหลว พร้อมขั้นตอนการตรวจสอบที่มีโครงสร้าง ชี้แนะผ่านกระบวนการวินิจฉัยสี่ขั้นตอน: ระบุความล้มเหลว ดึงรายละเอียดข้อผิดพลาด รวบรวมข้อมูลบริบท และส่งมอบขั้นตอนการแก้ไขที่สามารถดำเนินการได้ จัดหมวดหมู่ความล้มเหลวออกเป็นสี่ประเภท (ข้อมูล โค้ด โครงสร้างพื้นฐาน การพึ่งพา) เพื่อมุ่งเน้นการตรวจสอบและแนะนำการแก้ไขที่เหมาะสม ให้คำสั่ง CLI ที่พร้อมใช้งานสำหรับการดึงข้อมูลบันทึก การเปรียบเทียบการรัน การล้างงาน และ DAG...

npx skills add https://github.com/astronomer/agents --skill debugging-dags

DAG Diagnosis

You are a data engineer debugging a failed Airflow DAG. Follow this systematic approach to identify the root cause and provide actionable remediation.

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Step 1: Identify the Failure

If a specific DAG was mentioned:

  • Run af runs diagnose <dag_id> <dag_run_id> (if run_id is provided)
  • If no run_id specified, run af dags stats to find recent failures

If no DAG was specified:

  • Run af health to find recent failures across all DAGs
  • Check for import errors with af dags errors
  • Show DAGs with recent failures
  • Ask which DAG to investigate further

Step 2: Get the Error Details

Once you have identified a failed task:

  1. Get task logs using af tasks logs <dag_id> <dag_run_id> <task_id>
  2. Look for the actual exception - scroll past the Airflow boilerplate to find the real error
  3. Categorize the failure type:
    • Data issue: Missing data, schema change, null values, constraint violation
    • Code issue: Bug, syntax error, import failure, type error
    • Infrastructure issue: Connection timeout, resource exhaustion, permission denied
    • Dependency issue: Upstream failure, external API down, rate limiting

Step 3: Check Context

Gather additional context to understand WHY this happened:

  1. Recent changes: Was there a code deploy? Check git history if available
  2. Package version changes: Was a package upgraded — in the image, in a venv-style operator, or at the index? See Package version changes below.
  3. Data volume: Did data volume spike? Run a quick count on source tables
  4. Upstream health: Did upstream tasks succeed but produce unexpected data?
  5. Historical pattern: Is this a recurring failure? Check if same task failed before
  6. Timing: Did this fail at an unusual time? (resource contention, maintenance windows)

Use af runs get <dag_id> <dag_run_id> to compare the failed run against recent successful runs.

Package version changes

A common cause of failures with no git activity is dependency drift — the user's code didn't change, but a package they depend on did. Check in this order:

  1. Worker image diff (preferred when available). Every Astro deploy = new image tag, so the registry has a "before" and "after". Diff pip freeze between current and previous image — that's ground truth for what changed:

    docker run --rm <current_image> pip freeze > /tmp/now.txt
    docker run --rm <previous_image> pip freeze > /tmp/prev.txt
    diff /tmp/prev.txt /tmp/now.txt
    

    Also compare docker run --rm <image> python --version between the two — a Python minor-version bump (3.11 → 3.12, or even a patch) can break wheel compatibility even when pip freeze looks identical. af config providers lists currently installed provider versions, useful for cross-checking against modules named in the traceback.

  2. Venv-style operators bypass the worker image. @task.virtualenv, PythonVirtualenvOperator, ExternalPythonOperator, and KubernetesPodOperator build their environment per task run, so an image diff won't catch failures inside them. If the failed task is one of these, read its requirements / image / python_version / python args directly:

    • Unbounded specifier (e.g. pandas>=2.0.0 with no upper bound, or no specifier at all) → a new upstream release is the prime suspect.
    • image="foo:latest" or no tag → the image moved underneath you.
    • python_version="3.11" (on @task.virtualenv / PythonVirtualenvOperator) or a python path (on ExternalPythonOperator) resolving to a different interpreter than it used to — a Python minor-version change can break wheel compatibility for unchanged requirements. Same vector applies to the worker image itself if the base Python changed there.

    Fix is to pin: pandas>=2.0.0,<3.0.0, a lockfile, a specific image SHA, or a fully-qualified Python version (python_version="3.11.7" instead of "3.11").

  3. Index lookup when image diff isn't conclusive (no image history, or a venv-style operator). Identify the configured index first — it may not be PyPI:

    • Env vars: UV_INDEX_URL, PIP_INDEX_URL, PIP_EXTRA_INDEX_URL
    • pyproject.toml[[tool.uv.index]]
    • ~/.pip/pip.conf, /etc/pip.conf
    • Dockerfile --index-url flags

    Then query for releases of the suspect package since the first failure started. PyPI:

    curl -s https://pypi.org/pypi/<pkg>/json | jq '.releases | to_entries | map({version: .key, uploaded: .value[0].upload_time}) | sort_by(.uploaded) | reverse | .[:5]'
    

    Private indexes usually expose the same /pypi/<pkg>/json shape; fall back to the Simple API (/simple/<pkg>/) or ask the user if neither works.

A release timestamp landing between the last green run and the first red run, for a package named in the traceback, is the answer.

On Astro

If you're running on Astro, these additional tools can help with diagnosis:

  • Deployment activity log: Check the Astro UI for recent deploys — a failed deploy or recent code change is often the cause of sudden failures
  • Astro alerts: Configure alerts in the Astro UI for proactive failure monitoring (DAG failure, task duration, SLA miss)
  • Observability: Use the Astro observability dashboard to track DAG health trends and spot recurring issues

On OSS Airflow

  • Airflow UI: Use the DAGs page, Graph view, and task logs to inspect recent runs and failures

Step 4: Provide Actionable Output

Structure your diagnosis as:

Root Cause

What actually broke? Be specific - not "the task failed" but "the task failed because column X was null in 15% of rows when the code expected 0%".

Impact Assessment

  • What data is affected? Which tables didn't get updated?
  • What downstream processes are blocked?
  • Is this blocking production dashboards or reports?

Immediate Fix

Specific steps to resolve RIGHT NOW:

  1. If it's a data issue: SQL to fix or skip bad records
  2. If it's a code issue: The exact code change needed
  3. If it's infra: Who to contact or what to restart

Prevention

How to prevent this from happening again:

  • Add data quality checks?
  • Add better error handling?
  • Add alerting for edge cases?
  • Update documentation?
  • Pin dependencies (constraints file, lockfile, or upper-bound specifiers on venv/external/pod operators) to avoid silent upstream drift?

Quick Commands

Provide ready-to-use commands:

  • To clear and rerun the entire DAG run: af runs clear <dag_id> <run_id>
  • To clear and rerun specific failed tasks: af tasks clear <dag_id> <run_id> <task_ids> -D
  • To delete a stuck or unwanted run: af runs delete <dag_id> <run_id>

Skills เพิ่มเติมจาก astronomer

airflow
astronomer
สอบถาม จัดการ และแก้ไขปัญหา DAGs, การรัน, งาน และการกำหนดค่าระบบของ Apache Airflow รองรับคำสั่งมากกว่า 30 คำสั่งสำหรับการตรวจสอบ DAG, การจัดการการรัน, การบันทึกงาน, การสอบถามการกำหนดค่า และการเข้าถึง REST API โดยตรง จัดการอินสแตนซ์ Airflow หลายตัวพร้อมการกำหนดค่าถาวร ค้นหาการปรับใช้ในเครื่องและ Astro โดยอัตโนมัติ เรียกใช้ DAG แบบซิงโครนัส (รอให้เสร็จ) หรือแบบอะซิงโครนัส วินิจฉัยข้อผิดพลาด ล้างการรันเพื่อลองใหม่ และเข้าถึงบันทึกงานพร้อมการกรอง retry/map-index ผลลัพธ์...
official
airflow-hitl
astronomer
ประตูการอนุมัติของมนุษย์, การป้อนข้อมูลฟอร์ม, และการแตกกิ่งใน Airflow DAGs โดยใช้ตัวดำเนินการที่สามารถเลื่อนได้ ตัวดำเนินการสี่ประเภท: ApprovalOperator สำหรับการตัดสินใจอนุมัติ/ปฏิเสธ, HITLOperator สำหรับการเลือกหลายตัวเลือกพร้อมฟอร์ม, HITLBranchOperator สำหรับการกำหนดเส้นทางงานที่ขับเคลื่อนโดยมนุษย์, และ HITLEntryOperator สำหรับการรวบรวมข้อมูลฟอร์ม ตัวดำเนินการทั้งหมดสามารถเลื่อนได้ โดยปล่อยช่อง worker ขณะรอการตอบสนองจากมนุษย์ผ่านแท็บ Required Actions ของ Airflow UI หรือ REST API รองรับคุณสมบัติเสริมรวมถึงแบบกำหนดเอง...
official
airflow-plugins
astronomer
สร้างปลั๊กอิน Airflow 3.1+ ที่ฝังแอป FastAPI, หน้า UI แบบกำหนดเอง, คอมโพเนนต์ React, มิดเดิลแวร์, มาโคร และลิงก์โอเปอเรเตอร์ลงใน UI ของ Airflow โดยตรง ใช้…
official
analyzing-data
astronomer
สอบถามคลังข้อมูลของคุณเพื่อตอบคำถามทางธุรกิจด้วยรูปแบบที่แคชไว้และการแมปแนวคิด รองรับการค้นหารูปแบบและการแคชสำหรับประเภทคำถามที่เกิดซ้ำ พร้อมบันทึกผลลัพธ์เพื่อปรับปรุงการสอบถามในอนาคต รวมถึงแคชการแมปแนวคิดไปยังตารางและการค้นพบสคีมาตารางผ่าน INFORMATION_SCHEMA หรือการ grep โค้ดเบส มีฟังก์ชันเคอร์เนล run_sql() และ run_sql_pandas() ที่ส่งคืน Polars หรือ Pandas DataFrames สำหรับการวิเคราะห์ คำสั่ง CLI สำหรับจัดการแคชแนวคิด รูปแบบ และตาราง รวมถึง...
official
annotating-task-lineage
astronomer
ใส่คำอธิบาย Airflow tasks ด้วย data lineage โดยใช้ inlets และ outlets รองรับ OpenLineage Dataset objects, Airflow Assets และ Airflow Datasets สำหรับกำหนด inputs และ outputs ครอบคลุมฐานข้อมูล, data warehouses และ cloud storage ใช้เป็นทางเลือกสำรองเมื่อ operators ไม่มี OpenLineage extractors ในตัว; ทำงานตามระบบลำดับความสำคัญสี่ระดับที่ custom extractors และ OpenLineage methods มีสิทธิ์優先 รวมถึงตัวช่วยตั้งชื่อ dataset สำหรับ Snowflake, BigQuery, S3 และ PostgreSQL เพื่อให้มั่นใจถึงความสอดคล้อง...
official
authoring-dags
astronomer
เวิร์กโฟลว์แบบมีคำแนะนำสำหรับสร้าง Apache Airflow DAGs พร้อมการตรวจสอบความถูกต้องและการผสานการทดสอบ แนวทางแบบหกขั้นตอน: ค้นพบสภาพแวดล้อมและรูปแบบที่มีอยู่ วางแผนโครงสร้าง DAG ดำเนินการตามแนวทางปฏิบัติที่ดีที่สุด ตรวจสอบความถูกต้องด้วยคำสั่ง CLI ของ af ทดสอบโดยได้รับความยินยอมจากผู้ใช้ และปรับปรุงแก้ไขซ้ำ คำสั่ง CLI สำหรับการค้นพบ (af config connections, af config providers, af dags list) และการตรวจสอบความถูกต้อง (af dags errors, af dags get, af dags explore) ให้ข้อเสนอแนะทันทีเกี่ยวกับ DAG...
official
blueprint
astronomer
กำหนดเทมเพลตกลุ่มงาน Airflow ที่ใช้ซ้ำได้พร้อมการตรวจสอบความถูกต้องด้วย Pydantic และประกอบ DAG จาก YAML ใช้เมื่อสร้างเทมเพลต blueprint หรือประกอบ DAG จาก…
official
checking-freshness
astronomer
ตรวจสอบความสดใหม่ของข้อมูลโดยการตรวจสอบเวลาปรับปรุงของตารางและรูปแบบการอัปเดตเทียบกับระดับความเก่า ระบุคอลัมน์เวลาปรับปรุงโดยใช้รูปแบบการตั้งชื่อ ETL ทั่วไป (เช่น _loaded_at, _updated_at, created_at) และสอบถามค่าสูงสุดเพื่อกำหนดอายุ จัดประเภทข้อมูลเป็นสถานะความสดใหม่สี่สถานะ: สดใหม่ (น้อยกว่า 4 ชั่วโมง), เก่า (4–24 ชั่วโมง), เก่ามาก (มากกว่า 24 ชั่วโมง) หรือไม่ทราบ (ไม่พบเวลาปรับปรุง) มีเทมเพลต SQL สำหรับตรวจสอบเวลาปรับปรุงล่าสุดและแนวโน้มจำนวนแถวในช่วงไม่กี่วันที่ผ่านมาเพื่อ...
official