migrating-airflow-2-to-3

作者: astronomer

针对Apache Airflow 2.x DAG升级至Airflow 3.x的自动化检测与代码迁移工具。提供基于Ruff的自动修复规则(AIR30/AIR301/AIR302/AIR31/AIR311/AIR312),用于检测并解决导入、运算符、钩子及上下文变量中的破坏性变更。涵盖关键架构迁移:工作节点不再直接访问元数据库;需改用Airflow Python客户端或REST API替代ORM会话查询。包含针对Ruff无法自动修复问题的手动迁移清单:cron...

npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3

Airflow 2 to 3 Migration

This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).

Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.

Migration at a Glance

  1. Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
    • Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
    • Hard behavior/config gotchas to explicitly review:
      • Cron scheduling semantics: consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals.
      • .airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior.
      • OAuth callback URLs add an /auth/ prefix (e.g. /auth/oauth-authorized/google).
      • Shared utility imports: Bare imports like import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.
  3. Plan changes per file and issue type:
    • Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
  4. Implement changes incrementally, re-running Ruff and code searches after each major change.
  5. Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.

Architecture & Metadata DB Access

Airflow 3 changes how components talk to the metadata database:

  • Workers no longer connect directly to the metadata DB.
  • Task code runs via the Task Execution API exposed by the API server.
  • The DAG processor runs as an independent process separate from the scheduler.
  • The Triggerer uses the task execution mechanism via an in-process API server.

Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).

Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:

RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Patterns to search for

When scanning DAGs, custom operators, and @task functions, look for:

  • Session helpers: provide_session, create_session, @provide_session
  • Sessions from settings: from airflow.settings import Session
  • Engine access: from airflow.settings import engine
  • ORM usage with models: session.query(DagModel)..., session.query(DagRun)...

Replacement: Airflow Python client

Preferred for rich metadata access patterns. Add to requirements.txt:

apache-airflow-client==<your-airflow-runtime-version>

Example usage:

import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Replacement: Direct REST API calls

For simple cases, call the REST API directly using requests:

from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Ruff Airflow Migration Rules

Use Ruff's Airflow rules to detect and fix many breaking changes automatically.

  • AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
  • AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.

Commands to run (via uv) against the project root:

# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .

# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .

Reference Files

For detailed code examples and migration patterns, see:


Quick Reference Tables

Key Import Changes

Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperatorairflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperatorairflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperatorairflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dagairflow.sdk.dag
airflow.decorators.taskairflow.sdk.task
airflow.datasets.Datasetairflow.sdk.Asset

Context Key Changes

Removed KeyReplacement
execution_datecontext["dag_run"].logical_date
tomorrow_ds / yesterday_dsUse ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_dsprev_start_date_success or timetable API
triggering_dataset_eventstriggering_asset_events
templates_dictcontext["params"]

Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.

Cannot trigger with future logical_date: Use logical_date=None and rely on run_id instead.

Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.

Default Behavior Changes

SettingAirflow 2 DefaultAirflow 3 Default
scheduletimedelta(days=1)None
catchupTrueFalse

Callback Behavior Changes

  • on_success_callback no longer runs on skip; use on_skipped_callback if needed.
  • @teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.

Resources


Related Skills

  • testing-dags: For testing DAGs after migration
  • debugging-dags: For troubleshooting migration issues
  • deploying-airflow: For deploying migrated DAGs to production

来自 astronomer 的更多技能

airflow
astronomer
查询、管理和排查Apache Airflow的DAG、运行记录、任务及系统配置。支持30多种命令,涵盖DAG检查、运行管理、任务日志、配置查询及直接REST API访问。通过持久化配置管理多个Airflow实例;自动发现本地和Astro部署。同步(等待完成)或异步触发DAG运行,诊断故障,清除运行记录以重试,并通过重试/映射索引过滤访问任务日志。输出...
official
airflow-hitl
astronomer
在Airflow DAG中使用可延迟操作符实现人工审批关卡、表单输入和分支。四种操作符类型:用于批准/拒绝决策的ApprovalOperator、带表单的多选项选择HITLOperator、人工驱动的任务路由HITLBranchOperator,以及表单数据收集HITLEntryOperator。所有操作符均为可延迟设计,在通过Airflow UI的"必需操作"标签页或REST API等待人工响应时释放工作槽位。支持包括自定义在内的可选功能...
official
airflow-plugins
astronomer
构建嵌入FastAPI应用、自定义UI页面、React组件、中间件、宏和操作符链接的Airflow 3.1+插件,直接集成到Airflow UI中。使用…
official
analyzing-data
astronomer
查询数据仓库,利用缓存的模式和概念映射来回答业务问题。支持对重复问题类型进行模式查找和缓存,并通过记录结果来改进后续查询。包含概念到表的映射缓存,以及通过INFORMATION_SCHEMA或代码库grep进行表结构发现。提供run_sql()和run_sql_pandas()内核函数,返回Polars或Pandas DataFrame用于分析。提供CLI命令用于管理概念、模式和表缓存,以及...
official
annotating-task-lineage
astronomer
使用入口和出口为Airflow任务标注数据血缘。支持使用OpenLineage Dataset对象、Airflow Assets和Airflow Datasets定义跨数据库、数据仓库及云存储的输入输出。当运算符缺少内置OpenLineage提取器时作为备用方案;遵循四级优先级系统,其中自定义提取器和OpenLineage方法优先。包含针对Snowflake、BigQuery、S3和PostgreSQL的数据集命名辅助工具,以确保一致性...
official
authoring-dags
astronomer
创建Apache Airflow DAG的引导式工作流,集成验证与测试。采用六阶段结构化方法:发现环境与现有模式、规划DAG结构、遵循最佳实践实现、通过af CLI命令验证、经用户同意测试、迭代修复。用于发现(af config connections、af config providers、af dags list)和验证(af dags errors、af dags get、af dags explore)的CLI命令可提供DAG的即时反馈...
official
blueprint
astronomer
使用Pydantic验证定义可复用的Airflow任务组模板,并从YAML组合DAG。适用于创建blueprint模板、从YAML组合DAG等场景。
official
checking-freshness
astronomer
通过检查表时间戳和更新模式与陈旧度标尺对比,验证数据新鲜度。使用常见ETL命名模式(如_loaded_at、_updated_at、created_at等)识别时间戳列,并查询其最大值以确定数据时效。将数据分为四种新鲜度状态:新鲜(<4小时)、陈旧(4–24小时)、非常陈旧(>24小时)或未知(未找到时间戳)。提供SQL模板,用于检查最近几天的最后更新时间及行数变化趋势。
official