authoring-dags

โดย astronomer

เวิร์กโฟลว์แบบมีคำแนะนำสำหรับสร้าง Apache Airflow DAGs พร้อมการตรวจสอบความถูกต้องและการผสานการทดสอบ แนวทางแบบหกขั้นตอน: ค้นพบสภาพแวดล้อมและรูปแบบที่มีอยู่ วางแผนโครงสร้าง DAG ดำเนินการตามแนวทางปฏิบัติที่ดีที่สุด ตรวจสอบความถูกต้องด้วยคำสั่ง CLI ของ af ทดสอบโดยได้รับความยินยอมจากผู้ใช้ และปรับปรุงแก้ไขซ้ำ คำสั่ง CLI สำหรับการค้นพบ (af config connections, af config providers, af dags list) และการตรวจสอบความถูกต้อง (af dags errors, af dags get, af dags explore) ให้ข้อเสนอแนะทันทีเกี่ยวกับ DAG...

npx skills add https://github.com/astronomer/agents --skill authoring-dags

DAG Authoring Skill

This skill guides you through creating and validating Airflow DAGs using best practices and af CLI commands.

For testing and debugging DAGs, see the testing-dags skill which covers the full test -> debug -> fix -> retest workflow.


Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Workflow Overview

+-----------------------------------------+
| 1. DISCOVER                             |
|    Understand codebase & environment    |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 2. PLAN                                 |
|    Propose structure, get approval      |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 3. IMPLEMENT                            |
|    Write DAG following patterns         |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 4. VALIDATE                             |
|    Check import errors, warnings        |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 5. TEST (with user consent)             |
|    Trigger, monitor, check logs         |
+-----------------------------------------+
                 |
+-----------------------------------------+
| 6. ITERATE                              |
|    Fix issues, re-validate              |
+-----------------------------------------+

Phase 1: Discover

Before writing code, understand the context.

Explore the Codebase

Use file tools to find existing patterns:

  • Glob for **/dags/**/*.py to find existing DAGs
  • Read similar DAGs to understand conventions
  • Check requirements.txt for available packages

Query the Airflow Environment

Use af CLI commands to understand what's available:

CommandPurpose
af config connectionsWhat external systems are configured
af config variablesWhat configuration values exist
af config providersWhat operator packages are installed
af config versionVersion constraints and features
af dags listExisting DAGs and naming conventions
af config poolsResource pools for concurrency

Example discovery questions:

  • "Is there a Snowflake connection?" -> af config connections
  • "What Airflow version?" -> af config version
  • "Are S3 operators available?" -> af config providers

Phase 2: Plan

Based on discovery, propose:

  1. DAG structure - Tasks, dependencies, schedule
  2. Operators to use - Based on available providers
  3. Connections needed - Existing or to be created
  4. Variables needed - Existing or to be created
  5. Packages needed - Additions to requirements.txt

Get user approval before implementing.


Phase 3: Implement

Write the DAG following best practices (see below). Key steps:

  1. Create DAG file in appropriate location
  2. Update requirements.txt if needed
  3. Save the file

Phase 4: Validate

Use af CLI as a feedback loop to validate your DAG.

Step 1: Check Import Errors

After saving, check for parse errors (Airflow will have already parsed the file):

af dags errors
  • If your file appears -> fix and retry
  • If no errors -> continue

Common causes: missing imports, syntax errors, missing packages.

Step 2: Verify DAG Exists

af dags get <dag_id>

Check: DAG exists, schedule correct, tags set, paused status.

Step 3: Check Warnings

af dags warnings

Look for deprecation warnings or configuration issues.

Step 4: Explore DAG Structure

af dags explore <dag_id>

Returns in one call: metadata, tasks, dependencies, source code.

On Astro

If you're running on Astro, you can also validate locally before deploying:

  • Parse check: Run astro dev parse to catch import errors and DAG-level issues without starting a full Airflow environment
  • DAG-only deploy: Once validated, use astro deploy --dags for fast DAG-only deploys that skip the Docker image build — ideal for iterating on DAG code

Phase 5: Test

See the testing-dags skill for comprehensive testing guidance.

Once validation passes, test the DAG using the workflow in the testing-dags skill:

  1. Get user consent -- Always ask before triggering
  2. Trigger and wait -- af runs trigger-wait <dag_id> --timeout 300
  3. Analyze results -- Check success/failure status
  4. Debug if needed -- af runs diagnose <dag_id> <run_id> and af tasks logs <dag_id> <run_id> <task_id>

Quick Test (Minimal)

# Ask user first, then:
af runs trigger-wait <dag_id> --timeout 300

For the full test -> debug -> fix -> retest loop, see testing-dags.


Phase 6: Iterate

If issues found:

  1. Fix the code
  2. Check for import errors: af dags errors
  3. Re-validate (Phase 4)
  4. Re-test using the testing-dags skill workflow (Phase 5)

CLI Quick Reference

PhaseCommandPurpose
Discoveraf config connectionsAvailable connections
Discoveraf config variablesConfiguration values
Discoveraf config providersInstalled operators
Discoveraf config versionVersion info
Validateaf dags errorsParse errors (check first!)
Validateaf dags get <dag_id>Verify DAG config
Validateaf dags warningsConfiguration warnings
Validateaf dags explore <dag_id>Full DAG inspection

Testing commands -- See the testing-dags skill for af runs trigger-wait, af runs diagnose, af tasks logs, etc.


Best Practices & Anti-Patterns

For code patterns and anti-patterns, see reference/best-practices.md.

Read this reference when writing new DAGs or reviewing existing ones. It covers what patterns are correct (including Airflow 3-specific behavior) and what to avoid.


Related Skills

  • testing-dags: For testing DAGs, debugging failures, and the test -> fix -> retest loop
  • debugging-dags: For troubleshooting failed DAGs
  • deploying-airflow: For deploying DAGs to production (Astro or open-source)
  • migrating-airflow-2-to-3: For migrating DAGs to Airflow 3

Skills เพิ่มเติมจาก astronomer

airflow
astronomer
สอบถาม จัดการ และแก้ไขปัญหา DAGs, การรัน, งาน และการกำหนดค่าระบบของ Apache Airflow รองรับคำสั่งมากกว่า 30 คำสั่งสำหรับการตรวจสอบ DAG, การจัดการการรัน, การบันทึกงาน, การสอบถามการกำหนดค่า และการเข้าถึง REST API โดยตรง จัดการอินสแตนซ์ Airflow หลายตัวพร้อมการกำหนดค่าถาวร ค้นหาการปรับใช้ในเครื่องและ Astro โดยอัตโนมัติ เรียกใช้ DAG แบบซิงโครนัส (รอให้เสร็จ) หรือแบบอะซิงโครนัส วินิจฉัยข้อผิดพลาด ล้างการรันเพื่อลองใหม่ และเข้าถึงบันทึกงานพร้อมการกรอง retry/map-index ผลลัพธ์...
official
airflow-hitl
astronomer
ประตูการอนุมัติของมนุษย์, การป้อนข้อมูลฟอร์ม, และการแตกกิ่งใน Airflow DAGs โดยใช้ตัวดำเนินการที่สามารถเลื่อนได้ ตัวดำเนินการสี่ประเภท: ApprovalOperator สำหรับการตัดสินใจอนุมัติ/ปฏิเสธ, HITLOperator สำหรับการเลือกหลายตัวเลือกพร้อมฟอร์ม, HITLBranchOperator สำหรับการกำหนดเส้นทางงานที่ขับเคลื่อนโดยมนุษย์, และ HITLEntryOperator สำหรับการรวบรวมข้อมูลฟอร์ม ตัวดำเนินการทั้งหมดสามารถเลื่อนได้ โดยปล่อยช่อง worker ขณะรอการตอบสนองจากมนุษย์ผ่านแท็บ Required Actions ของ Airflow UI หรือ REST API รองรับคุณสมบัติเสริมรวมถึงแบบกำหนดเอง...
official
airflow-plugins
astronomer
สร้างปลั๊กอิน Airflow 3.1+ ที่ฝังแอป FastAPI, หน้า UI แบบกำหนดเอง, คอมโพเนนต์ React, มิดเดิลแวร์, มาโคร และลิงก์โอเปอเรเตอร์ลงใน UI ของ Airflow โดยตรง ใช้…
official
analyzing-data
astronomer
สอบถามคลังข้อมูลของคุณเพื่อตอบคำถามทางธุรกิจด้วยรูปแบบที่แคชไว้และการแมปแนวคิด รองรับการค้นหารูปแบบและการแคชสำหรับประเภทคำถามที่เกิดซ้ำ พร้อมบันทึกผลลัพธ์เพื่อปรับปรุงการสอบถามในอนาคต รวมถึงแคชการแมปแนวคิดไปยังตารางและการค้นพบสคีมาตารางผ่าน INFORMATION_SCHEMA หรือการ grep โค้ดเบส มีฟังก์ชันเคอร์เนล run_sql() และ run_sql_pandas() ที่ส่งคืน Polars หรือ Pandas DataFrames สำหรับการวิเคราะห์ คำสั่ง CLI สำหรับจัดการแคชแนวคิด รูปแบบ และตาราง รวมถึง...
official
annotating-task-lineage
astronomer
ใส่คำอธิบาย Airflow tasks ด้วย data lineage โดยใช้ inlets และ outlets รองรับ OpenLineage Dataset objects, Airflow Assets และ Airflow Datasets สำหรับกำหนด inputs และ outputs ครอบคลุมฐานข้อมูล, data warehouses และ cloud storage ใช้เป็นทางเลือกสำรองเมื่อ operators ไม่มี OpenLineage extractors ในตัว; ทำงานตามระบบลำดับความสำคัญสี่ระดับที่ custom extractors และ OpenLineage methods มีสิทธิ์優先 รวมถึงตัวช่วยตั้งชื่อ dataset สำหรับ Snowflake, BigQuery, S3 และ PostgreSQL เพื่อให้มั่นใจถึงความสอดคล้อง...
official
blueprint
astronomer
กำหนดเทมเพลตกลุ่มงาน Airflow ที่ใช้ซ้ำได้พร้อมการตรวจสอบความถูกต้องด้วย Pydantic และประกอบ DAG จาก YAML ใช้เมื่อสร้างเทมเพลต blueprint หรือประกอบ DAG จาก…
official
checking-freshness
astronomer
ตรวจสอบความสดใหม่ของข้อมูลโดยการตรวจสอบเวลาปรับปรุงของตารางและรูปแบบการอัปเดตเทียบกับระดับความเก่า ระบุคอลัมน์เวลาปรับปรุงโดยใช้รูปแบบการตั้งชื่อ ETL ทั่วไป (เช่น _loaded_at, _updated_at, created_at) และสอบถามค่าสูงสุดเพื่อกำหนดอายุ จัดประเภทข้อมูลเป็นสถานะความสดใหม่สี่สถานะ: สดใหม่ (น้อยกว่า 4 ชั่วโมง), เก่า (4–24 ชั่วโมง), เก่ามาก (มากกว่า 24 ชั่วโมง) หรือไม่ทราบ (ไม่พบเวลาปรับปรุง) มีเทมเพลต SQL สำหรับตรวจสอบเวลาปรับปรุงล่าสุดและแนวโน้มจำนวนแถวในช่วงไม่กี่วันที่ผ่านมาเพื่อ...
official
cosmos-dbt-core
astronomer
แปลงโปรเจกต์ dbt Core เป็น Airflow DAGs หรือ TaskGroups โดยใช้ Astronomer Cosmos รองรับรูปแบบการประกอบสามแบบ: DbtDag แบบสแตนด์อโลน, DbtTaskGroup ภายใน DAG ที่มีอยู่ และตัวดำเนินการ Cosmos แต่ละตัวเพื่อการควบคุมแบบละเอียด เลือกจากโหมดการทำงานแปดโหมด (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC และอื่นๆ) ตามความต้องการด้านการแยกส่วนและประสิทธิภาพ มีกลยุทธ์การแยกวิเคราะห์สามแบบ (dbt_manifest, dbt_ls, dbt_ls_file, automatic) เพื่อปรับสมดุลความเร็วและความซับซ้อนของตัวเลือก...
official