migrating-airflow-2-to-3

tarafından astronomer

We need to translate the given text from English to Turkish, preserving the specified name "migrating-airflow-2-to-3" only if it appears in the source text. The source text does not contain that name; it's only in the instruction. So we don't include it. We must preserve product names (Apache Airflow, Airflow 2.x, Airflow 3.x, Ruff, AIR30/AIR301 etc., Airflow Python client, REST API, ORM), protocol names, URLs (none), numbers, technical terms. Do not add any extra commentary, labels, etc. Just translate the text. The text: "Automated detection and code migration for upgrading Apache Airflow 2.x DAGs to Airflow 3.x. Provides Ruff-based auto-fix rules (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312) to detect and resolve breaking changes in imports, operators, hooks, and context variables Covers critical architecture shifts: workers no longer access metadata DB directly; use the Airflow Python client

npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3

Airflow 2 to 3 Migration

This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).

Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.

Migration at a Glance

  1. Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
    • Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
    • Hard behavior/config gotchas to explicitly review:
      • Cron scheduling semantics: consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals.
      • .airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior.
      • OAuth callback URLs add an /auth/ prefix (e.g. /auth/oauth-authorized/google).
      • Shared utility imports: Bare imports like import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.
  3. Plan changes per file and issue type:
    • Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
  4. Implement changes incrementally, re-running Ruff and code searches after each major change.
  5. Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.

Architecture & Metadata DB Access

Airflow 3 changes how components talk to the metadata database:

  • Workers no longer connect directly to the metadata DB.
  • Task code runs via the Task Execution API exposed by the API server.
  • The DAG processor runs as an independent process separate from the scheduler.
  • The Triggerer uses the task execution mechanism via an in-process API server.

Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).

Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:

RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Patterns to search for

When scanning DAGs, custom operators, and @task functions, look for:

  • Session helpers: provide_session, create_session, @provide_session
  • Sessions from settings: from airflow.settings import Session
  • Engine access: from airflow.settings import engine
  • ORM usage with models: session.query(DagModel)..., session.query(DagRun)...

Replacement: Airflow Python client

Preferred for rich metadata access patterns. Add to requirements.txt:

apache-airflow-client==<your-airflow-runtime-version>

Example usage:

import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Replacement: Direct REST API calls

For simple cases, call the REST API directly using requests:

from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Ruff Airflow Migration Rules

Use Ruff's Airflow rules to detect and fix many breaking changes automatically.

  • AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
  • AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.

Commands to run (via uv) against the project root:

# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .

# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .

Reference Files

For detailed code examples and migration patterns, see:


Quick Reference Tables

Key Import Changes

Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperatorairflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperatorairflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperatorairflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dagairflow.sdk.dag
airflow.decorators.taskairflow.sdk.task
airflow.datasets.Datasetairflow.sdk.Asset

Context Key Changes

Removed KeyReplacement
execution_datecontext["dag_run"].logical_date
tomorrow_ds / yesterday_dsUse ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_dsprev_start_date_success or timetable API
triggering_dataset_eventstriggering_asset_events
templates_dictcontext["params"]

Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.

Cannot trigger with future logical_date: Use logical_date=None and rely on run_id instead.

Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.

Default Behavior Changes

SettingAirflow 2 DefaultAirflow 3 Default
scheduletimedelta(days=1)None
catchupTrueFalse

Callback Behavior Changes

  • on_success_callback no longer runs on skip; use on_skipped_callback if needed.
  • @teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.

Resources


Related Skills

  • testing-dags: For testing DAGs after migration
  • debugging-dags: For troubleshooting migration issues
  • deploying-airflow: For deploying migrated DAGs to production

astronomer tarafından daha fazla skill

airflow
astronomer
Apache Airflow DAG'larını, çalıştırmalarını, görevlerini ve sistem yapılandırmasını sorgulayın, yönetin ve sorun giderin. DAG inceleme, çalıştırma yönetimi, görev günlüğü, yapılandırma sorguları ve doğrudan REST API erişimi dahil olmak üzere 30'dan fazla komutu destekler. Kalıcı yapılandırma ile birden çok Airflow örneğini yönetin; yerel ve Astro dağıtımlarını otomatik olarak keşfedin. DAG çalıştırmalarını eşzamanlı (tamamlanmayı bekleme) veya eşzamansız olarak tetikleyin, hataları teşhis edin, yeniden deneme için çalıştırm
official
airflow-hitl
astronomer
İnsan onay kapıları, form girdileri ve ertelenebilir operatörler kullanarak Airflow DAG'lerinde dallanma. Dört operatör türü: onay/red kararları için ApprovalOperator, formlarla çok seçenekli seçim için HITLOperator, insan odaklı görev yönlendirmesi için HITLBranchOperator ve form verisi toplama için HITLEntryOperator. Tüm operatörler ertelenebilir olup, Airflow UI'nin Gerekli İşlemler sekmesi veya REST API aracılığıyla insan yanıtı beklenirken işçi slotlarını serbest bırakır. Özel... dahil isteğe bağlı özellikleri destekler.
official
airflow-plugins
astronomer
Airflow 3.1+ eklentileri oluşturun; FastAPI uygulamaları, özel UI sayfaları, React bileşenleri, middleware, makrolar ve operatör bağlantılarını doğrudan Airflow arayüzüne yerleştirin. Kullanın…
official
analyzing-data
astronomer
Veri ambarınıza sorgu yaparak, önbelleğe alınmış desenler ve kavram eşlemeleriyle iş sorularını yanıtlayın. Tekrarlanan soru türleri için desen arama ve önbelleğe alma desteği sunar, gelecekteki sorguları iyileştirmek için sonuç kaydı yapar. Kavram-tablo eşleme önbelleği ve INFORMATION_SCHEMA veya kod tabanı grep aracılığıyla tablo şeması keşfi içerir. Analiz için Polars veya Pandas DataFrame'leri döndüren run_sql() ve run_sql_pandas() çekirdek fonksiyonlarını sağlar. Kavram, desen ve tablo önbelleklerini yönetmek için CLI komutları ve ayrıca...
official
annotating-task-lineage
astronomer
Airflow görevlerini, giriş ve çıkış noktalarını kullanarak veri soy ağacı ile açıklayın. Veritabanları, veri ambarları ve bulut depolama arasında girdi ve çıktıları tanımlamak için OpenLineage Dataset nesnelerini, Airflow Varlıklarını ve Airflow Veri Kümelerini destekler. Operatörlerde yerleşik OpenLineage çıkarıcılar bulunmadığında yedek olarak kullanın; özel çıkarıcıların ve OpenLineage yöntemlerinin öncelikli olduğu dört katmanlı bir öncelik sistemini izler. Snowflake, BigQuery, S3 ve PostgreSQL için tutarlı veri kümesi adlandırma yardımcıları içerir...
official
authoring-dags
astronomer
Apache Airflow DAG'ları oluşturmak için doğrulama ve test entegrasyonu içeren rehberli iş akışı. Yapılandırılmış altı aşamalı yaklaşım: ortamı ve mevcut kalıpları keşfetme, DAG yapısını planlama, en iyi uygulamaları takip ederek uygulama, af CLI komutlarıyla doğrulama, kullanıcı onayıyla test etme ve düzeltmeler üzerinde yineleme. Keşif (af config connections, af config providers, af dags list) ve doğrulama (af dags errors, af dags get, af dags explore) için CLI komutları, DAG hakkında anında geri bildirim sağlar...
official
blueprint
astronomer
Pydantic doğrulaması ile yeniden kullanılabilir Airflow görev grubu şablonları tanımlayın ve YAML’dan DAG’ler oluşturun. Blueprint şablonları oluştururken, YAML’dan DAG’ler oluştururken kullanın…
official
checking-freshness
astronomer
Tablo zaman damgalarını ve güncelleme desenlerini bir bayatlık ölçeğine göre kontrol ederek veri tazeliğini doğrular. Yaygın ETL adlandırma desenlerini (_loaded_at, _updated_at, created_at vb.) kullanarak zaman damgası sütunlarını tanımlar ve yaşı belirlemek için maksimum değerlerini sorgular. Verileri dört tazelik durumuna ayırır: Taze (< 4 saat), Bayat (4–24 saat), Çok Bayat (> 24 saat) veya Bilinmiyor (zaman damgası bulunamadı). Son güncelleme zamanını ve son günlerdeki satır sayısı eğilimlerini kontrol etmek için SQL şablonları sağlar...
official