migrating-airflow-2-to-3

作成者: astronomer

Apache Airflow 2.xのDAGをAirflow 3.xにアップグレードするための自動検出とコード移行。Ruffベースの自動修正ルール(AIR30/AIR301/AIR302/AIR31/AIR311/AIR312)を提供し、インポート、オペレーター、フック、コンテキスト変数における破壊的変更を検出・解決します。重要なアーキテクチャの変更に対応:ワーカーはメタデータDBに直接アクセスしなくなり、ORMセッションクエリの代わりにAirflow PythonクライアントまたはREST APIを使用します。Ruffが自動修正できない問題に対する手動移行チェックリストを含む:cron...

npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3

Airflow 2 to 3 Migration

This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).

Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.

Migration at a Glance

  1. Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
    • Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
    • Hard behavior/config gotchas to explicitly review:
      • Cron scheduling semantics: consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals.
      • .airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior.
      • OAuth callback URLs add an /auth/ prefix (e.g. /auth/oauth-authorized/google).
      • Shared utility imports: Bare imports like import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.
  3. Plan changes per file and issue type:
    • Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
  4. Implement changes incrementally, re-running Ruff and code searches after each major change.
  5. Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.

Architecture & Metadata DB Access

Airflow 3 changes how components talk to the metadata database:

  • Workers no longer connect directly to the metadata DB.
  • Task code runs via the Task Execution API exposed by the API server.
  • The DAG processor runs as an independent process separate from the scheduler.
  • The Triggerer uses the task execution mechanism via an in-process API server.

Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).

Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:

RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Patterns to search for

When scanning DAGs, custom operators, and @task functions, look for:

  • Session helpers: provide_session, create_session, @provide_session
  • Sessions from settings: from airflow.settings import Session
  • Engine access: from airflow.settings import engine
  • ORM usage with models: session.query(DagModel)..., session.query(DagRun)...

Replacement: Airflow Python client

Preferred for rich metadata access patterns. Add to requirements.txt:

apache-airflow-client==<your-airflow-runtime-version>

Example usage:

import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Replacement: Direct REST API calls

For simple cases, call the REST API directly using requests:

from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Ruff Airflow Migration Rules

Use Ruff's Airflow rules to detect and fix many breaking changes automatically.

  • AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
  • AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.

Commands to run (via uv) against the project root:

# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .

# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .

Reference Files

For detailed code examples and migration patterns, see:


Quick Reference Tables

Key Import Changes

Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperatorairflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperatorairflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperatorairflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dagairflow.sdk.dag
airflow.decorators.taskairflow.sdk.task
airflow.datasets.Datasetairflow.sdk.Asset

Context Key Changes

Removed KeyReplacement
execution_datecontext["dag_run"].logical_date
tomorrow_ds / yesterday_dsUse ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_dsprev_start_date_success or timetable API
triggering_dataset_eventstriggering_asset_events
templates_dictcontext["params"]

Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.

Cannot trigger with future logical_date: Use logical_date=None and rely on run_id instead.

Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.

Default Behavior Changes

SettingAirflow 2 DefaultAirflow 3 Default
scheduletimedelta(days=1)None
catchupTrueFalse

Callback Behavior Changes

  • on_success_callback no longer runs on skip; use on_skipped_callback if needed.
  • @teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.

Resources


Related Skills

  • testing-dags: For testing DAGs after migration
  • debugging-dags: For troubleshooting migration issues
  • deploying-airflow: For deploying migrated DAGs to production

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official