migrating-airflow-2-to-3

โดย astronomer

การตรวจจับอัตโนมัติและการย้ายโค้ดสำหรับอัปเกรด DAG ของ Apache Airflow 2.x เป็น Airflow 3.x มีกฎการแก้ไขอัตโนมัติแบบ Ruff (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312) เพื่อตรวจจับและแก้ไขการเปลี่ยนแปลงที่ทำให้เกิดข้อผิดพลาดในการนำเข้า โอเปอเรเตอร์ ฮุค และตัวแปรบริบท ครอบคลุมการเปลี่ยนแปลงสถาปัตยกรรมที่สำคัญ: เวิร์กเกอร์ไม่สามารถเข้าถึงฐานข้อมูลเมตาดาต้าโดยตรงอีกต่อไป ใช้ Airflow Python client หรือ REST API แทนการสอบถาม ORM session รวมถึงรายการตรวจสอบการย้ายด้วยตนเองสำหรับปัญหาที่ Ruff ไม่สามารถแก้ไขอัตโนมัติ: cron...

npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3

Airflow 2 to 3 Migration

This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).

Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.

Migration at a Glance

  1. Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
    • Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
    • Hard behavior/config gotchas to explicitly review:
      • Cron scheduling semantics: consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals.
      • .airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior.
      • OAuth callback URLs add an /auth/ prefix (e.g. /auth/oauth-authorized/google).
      • Shared utility imports: Bare imports like import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.
  3. Plan changes per file and issue type:
    • Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
  4. Implement changes incrementally, re-running Ruff and code searches after each major change.
  5. Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.

Architecture & Metadata DB Access

Airflow 3 changes how components talk to the metadata database:

  • Workers no longer connect directly to the metadata DB.
  • Task code runs via the Task Execution API exposed by the API server.
  • The DAG processor runs as an independent process separate from the scheduler.
  • The Triggerer uses the task execution mechanism via an in-process API server.

Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).

Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:

RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Patterns to search for

When scanning DAGs, custom operators, and @task functions, look for:

  • Session helpers: provide_session, create_session, @provide_session
  • Sessions from settings: from airflow.settings import Session
  • Engine access: from airflow.settings import engine
  • ORM usage with models: session.query(DagModel)..., session.query(DagRun)...

Replacement: Airflow Python client

Preferred for rich metadata access patterns. Add to requirements.txt:

apache-airflow-client==<your-airflow-runtime-version>

Example usage:

import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Replacement: Direct REST API calls

For simple cases, call the REST API directly using requests:

from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Ruff Airflow Migration Rules

Use Ruff's Airflow rules to detect and fix many breaking changes automatically.

  • AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
  • AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.

Commands to run (via uv) against the project root:

# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .

# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .

Reference Files

For detailed code examples and migration patterns, see:


Quick Reference Tables

Key Import Changes

Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperatorairflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperatorairflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperatorairflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dagairflow.sdk.dag
airflow.decorators.taskairflow.sdk.task
airflow.datasets.Datasetairflow.sdk.Asset

Context Key Changes

Removed KeyReplacement
execution_datecontext["dag_run"].logical_date
tomorrow_ds / yesterday_dsUse ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_dsprev_start_date_success or timetable API
triggering_dataset_eventstriggering_asset_events
templates_dictcontext["params"]

Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.

Cannot trigger with future logical_date: Use logical_date=None and rely on run_id instead.

Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.

Default Behavior Changes

SettingAirflow 2 DefaultAirflow 3 Default
scheduletimedelta(days=1)None
catchupTrueFalse

Callback Behavior Changes

  • on_success_callback no longer runs on skip; use on_skipped_callback if needed.
  • @teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.

Resources


Related Skills

  • testing-dags: For testing DAGs after migration
  • debugging-dags: For troubleshooting migration issues
  • deploying-airflow: For deploying migrated DAGs to production

Skills เพิ่มเติมจาก astronomer

airflow
astronomer
สอบถาม จัดการ และแก้ไขปัญหา DAGs, การรัน, งาน และการกำหนดค่าระบบของ Apache Airflow รองรับคำสั่งมากกว่า 30 คำสั่งสำหรับการตรวจสอบ DAG, การจัดการการรัน, การบันทึกงาน, การสอบถามการกำหนดค่า และการเข้าถึง REST API โดยตรง จัดการอินสแตนซ์ Airflow หลายตัวพร้อมการกำหนดค่าถาวร ค้นหาการปรับใช้ในเครื่องและ Astro โดยอัตโนมัติ เรียกใช้ DAG แบบซิงโครนัส (รอให้เสร็จ) หรือแบบอะซิงโครนัส วินิจฉัยข้อผิดพลาด ล้างการรันเพื่อลองใหม่ และเข้าถึงบันทึกงานพร้อมการกรอง retry/map-index ผลลัพธ์...
official
airflow-hitl
astronomer
ประตูการอนุมัติของมนุษย์, การป้อนข้อมูลฟอร์ม, และการแตกกิ่งใน Airflow DAGs โดยใช้ตัวดำเนินการที่สามารถเลื่อนได้ ตัวดำเนินการสี่ประเภท: ApprovalOperator สำหรับการตัดสินใจอนุมัติ/ปฏิเสธ, HITLOperator สำหรับการเลือกหลายตัวเลือกพร้อมฟอร์ม, HITLBranchOperator สำหรับการกำหนดเส้นทางงานที่ขับเคลื่อนโดยมนุษย์, และ HITLEntryOperator สำหรับการรวบรวมข้อมูลฟอร์ม ตัวดำเนินการทั้งหมดสามารถเลื่อนได้ โดยปล่อยช่อง worker ขณะรอการตอบสนองจากมนุษย์ผ่านแท็บ Required Actions ของ Airflow UI หรือ REST API รองรับคุณสมบัติเสริมรวมถึงแบบกำหนดเอง...
official
airflow-plugins
astronomer
สร้างปลั๊กอิน Airflow 3.1+ ที่ฝังแอป FastAPI, หน้า UI แบบกำหนดเอง, คอมโพเนนต์ React, มิดเดิลแวร์, มาโคร และลิงก์โอเปอเรเตอร์ลงใน UI ของ Airflow โดยตรง ใช้…
official
analyzing-data
astronomer
สอบถามคลังข้อมูลของคุณเพื่อตอบคำถามทางธุรกิจด้วยรูปแบบที่แคชไว้และการแมปแนวคิด รองรับการค้นหารูปแบบและการแคชสำหรับประเภทคำถามที่เกิดซ้ำ พร้อมบันทึกผลลัพธ์เพื่อปรับปรุงการสอบถามในอนาคต รวมถึงแคชการแมปแนวคิดไปยังตารางและการค้นพบสคีมาตารางผ่าน INFORMATION_SCHEMA หรือการ grep โค้ดเบส มีฟังก์ชันเคอร์เนล run_sql() และ run_sql_pandas() ที่ส่งคืน Polars หรือ Pandas DataFrames สำหรับการวิเคราะห์ คำสั่ง CLI สำหรับจัดการแคชแนวคิด รูปแบบ และตาราง รวมถึง...
official
annotating-task-lineage
astronomer
ใส่คำอธิบาย Airflow tasks ด้วย data lineage โดยใช้ inlets และ outlets รองรับ OpenLineage Dataset objects, Airflow Assets และ Airflow Datasets สำหรับกำหนด inputs และ outputs ครอบคลุมฐานข้อมูล, data warehouses และ cloud storage ใช้เป็นทางเลือกสำรองเมื่อ operators ไม่มี OpenLineage extractors ในตัว; ทำงานตามระบบลำดับความสำคัญสี่ระดับที่ custom extractors และ OpenLineage methods มีสิทธิ์優先 รวมถึงตัวช่วยตั้งชื่อ dataset สำหรับ Snowflake, BigQuery, S3 และ PostgreSQL เพื่อให้มั่นใจถึงความสอดคล้อง...
official
authoring-dags
astronomer
เวิร์กโฟลว์แบบมีคำแนะนำสำหรับสร้าง Apache Airflow DAGs พร้อมการตรวจสอบความถูกต้องและการผสานการทดสอบ แนวทางแบบหกขั้นตอน: ค้นพบสภาพแวดล้อมและรูปแบบที่มีอยู่ วางแผนโครงสร้าง DAG ดำเนินการตามแนวทางปฏิบัติที่ดีที่สุด ตรวจสอบความถูกต้องด้วยคำสั่ง CLI ของ af ทดสอบโดยได้รับความยินยอมจากผู้ใช้ และปรับปรุงแก้ไขซ้ำ คำสั่ง CLI สำหรับการค้นพบ (af config connections, af config providers, af dags list) และการตรวจสอบความถูกต้อง (af dags errors, af dags get, af dags explore) ให้ข้อเสนอแนะทันทีเกี่ยวกับ DAG...
official
blueprint
astronomer
กำหนดเทมเพลตกลุ่มงาน Airflow ที่ใช้ซ้ำได้พร้อมการตรวจสอบความถูกต้องด้วย Pydantic และประกอบ DAG จาก YAML ใช้เมื่อสร้างเทมเพลต blueprint หรือประกอบ DAG จาก…
official
checking-freshness
astronomer
ตรวจสอบความสดใหม่ของข้อมูลโดยการตรวจสอบเวลาปรับปรุงของตารางและรูปแบบการอัปเดตเทียบกับระดับความเก่า ระบุคอลัมน์เวลาปรับปรุงโดยใช้รูปแบบการตั้งชื่อ ETL ทั่วไป (เช่น _loaded_at, _updated_at, created_at) และสอบถามค่าสูงสุดเพื่อกำหนดอายุ จัดประเภทข้อมูลเป็นสถานะความสดใหม่สี่สถานะ: สดใหม่ (น้อยกว่า 4 ชั่วโมง), เก่า (4–24 ชั่วโมง), เก่ามาก (มากกว่า 24 ชั่วโมง) หรือไม่ทราบ (ไม่พบเวลาปรับปรุง) มีเทมเพลต SQL สำหรับตรวจสอบเวลาปรับปรุงล่าสุดและแนวโน้มจำนวนแถวในช่วงไม่กี่วันที่ผ่านมาเพื่อ...
official