migrating-ai-sdk-to-common-ai

作成者: astronomer

Airflowプロジェクトをairflow-ai-sdkからapache-airflow-providers-common-ai 0.1.0+に移行します。ユーザーがairflow-ai-sdkを置き換えたい場合にこのスキルを使用してください…

npx skills add https://github.com/astronomer/agents --skill migrating-ai-sdk-to-common-ai

Migrate airflow-ai-sdk to apache-airflow-providers-common-ai

This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (target 0.4.0+), the official Airflow AI provider built on PydanticAI. It also covers upgrading projects already on common-ai 0.1.x, since several capabilities (multimodal prompts, toolsets, embedding operators, structured-output XCom behavior) changed between 0.1.0 and 0.4.0.

CRITICAL: The new provider requires Airflow 3.0+ and (for 0.4.0) pydantic-ai-slim >= 1.71.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (pydanticai type). There is no @task.embed in the new provider; embeddings move to the LlamaIndex integration or a plain @task (see Step 3).

Before starting

Use the Grep tool with the pattern below to inventory everything that needs to migrate:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed

From the results, capture:

  1. All files importing airflow-ai-sdk / airflow_ai_sdk
  2. Which decorators are in use: @task.llm, @task.agent, @task.llm_branch, @task.embed
  3. The model configuration pattern (string names like "gpt-5", or OpenAIModel(...) objects)
  4. Any airflow_ai_sdk.BaseModel subclasses used as output_type

Use this inventory to drive the steps below.


Step 1: Update requirements.txt

Remove:

airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.

Add:

apache-airflow-providers-common-ai[openai]>=0.4.0

Use the latest available 0.x version unless the user has pinned a specific one. Available extras (0.4.0): [openai], [anthropic], [google], [bedrock], [llamaindex], [langchain], [mcp], plus file-format extras ([pdf], [docx], [parquet], [avro]) for DocumentLoaderOperator and [sql]/[common-sql] for the SQL operators. There are no [groq]/[mistral] extras; for those providers install the matching pydantic-ai-slim extra yourself.

Add [llamaindex] if the project migrates @task.embed to the LlamaIndexEmbeddingOperator (recommended, see Step 3). In that case sentence-transformers and torch can usually be removed, which shrinks the image considerably. Keep them only if the project stays on local sentence-transformers embeddings via plain @task.


Step 2: Create PydanticAI connection

The new provider uses an Airflow connection instead of model strings or objects in code.

Connection type: pydanticai Default connection ID: pydanticai_default

Via environment variable (.env)

AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "extra": {
        "model": "<provider>:<model-name>"
    }
}'

Model format

The model field uses provider:model format:

ProviderExample model value
OpenAIopenai:gpt-5
Anthropicanthropic:claude-sonnet-4-20250514
Googlegoogle:gemini-2.5-pro
Groqgroq:llama-3.3-70b-versatile
Mistralmistral:mistral-large-latest
Bedrockbedrock:us.anthropic.claude-sonnet-4-20250514-v1:0

Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)

Set host to the base URL:

AIRFLOW_CONN_PYDANTICAI_CORTEX='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "host": "https://my-endpoint.com/v1",
    "extra": {
        "model": "openai:<model-name>"
    }
}'

Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.

Connection ID convention

The env var name determines the connection ID:

  • AIRFLOW_CONN_PYDANTICAI_DEFAULT creates pydanticai_default
  • AIRFLOW_CONN_PYDANTICAI_CORTEX creates pydanticai_cortex

Model resolution priority

  1. model_id parameter on the decorator/operator (highest)
  2. model in connection's extra JSON (fallback)

Other connection types (0.4.0)

Besides pydanticai, the provider registers vendor-specific connection types: pydanticai-azure (Azure OpenAI: host = endpoint, extra api_version), pydanticai-bedrock (AWS credentials/region in extra), and pydanticai-vertex (GCP project/location in extra). The LlamaIndex and LangChain hooks read API key/host/extra from whatever connection ID they are given, so a single pydanticai_default connection can serve LLM calls and embeddings: one API key entry for the whole project.


Step 3: Migrate decorators

@task.llm

# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk

class MyOutput(ai_sdk.BaseModel):
    field: str

@task.llm(
    model="gpt-5",                    # or model=OpenAIModel(...)
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel

class MyOutput(BaseModel):
    field: str

@task.llm(
    llm_conn_id="pydanticai_default",  # Airflow connection ID
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
model="gpt-5"llm_conn_id="pydanticai_default"Model specified in connection
model=OpenAIModel(...)llm_conn_id="pydanticai_default"Model + endpoint in connection
system_prompt="..."system_prompt="..."Unchanged
output_type=MyModeloutput_type=MyModelUnchanged
result_type=MyModeloutput_type=MyModelresult_type was already deprecated
(not available)model_id="openai:gpt-5"Override connection's model
(not available)require_approval=TrueBuilt-in HITL review
(not available)agent_params={...}Extra kwargs for pydantic-ai Agent
(not available)serialize_output=TrueForce dict shape for BaseModel output

Multimodal prompts (0.4.0+): the translation function may return a Sequence[UserContent] instead of a string, e.g. for vision:

@task.llm(llm_conn_id="pydanticai_default", system_prompt="...", output_type=ReviewAnalysis)
def analyze(text: str, image_path: str | None = None):
    if image_path:
        with open(image_path, "rb") as f:
            return [text, BinaryContent(data=f.read(), media_type="image/jpeg")]
    return text

This matches the old airflow-ai-sdk vision pattern, so vision code migrates unchanged. Note: common-ai 0.1.x only accepted strings — if a project disabled vision to migrate to 0.1.0, re-enable it when bumping to 0.4.0. Non-string prompts are incompatible with require_approval=True / enable_hitl_review=True (both render the prompt as text).

Structured output via XCom (0.4.0 behavior change): with output_type=<BaseModel subclass>, the model instance flows through XCom on Airflow cores whose task SDK has SUPPORTS_OPERATOR_DESERIALIZATION_WALKER (attribute access downstream); on older cores (including Astro Runtime 3.2 task SDK 1.2.x) the provider automatically dumps to a dict (subscript access). Check which shape arrives at runtime before choosing attribute vs dict access downstream, or set serialize_output=True to force the dict shape everywhere. The output_type class must be defined at module scope (nested classes cannot be deserialized from XCom).

@task.llm_branch

# BEFORE
@task.llm_branch(
    model="gpt-5",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,
)
def route(text: str) -> str:
    return text

# AFTER
@task.llm_branch(
    llm_conn_id="pydanticai_default",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,    # same parameter, unchanged
)
def route(text: str) -> str:
    return text

Only change: model= becomes llm_conn_id=.

@task.agent

This has the biggest API change. The Agent is no longer pre-built in user code.

# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent

my_agent = Agent(
    "gpt-5",
    system_prompt="You are a research assistant.",
    tools=[search_tool, lookup_tool],
)

@task.agent(agent=my_agent)
def research(question: str) -> str:
    return question

# AFTER (common-ai provider) - No Agent object, config via parameters
from pydantic_ai.toolsets import FunctionToolset

@task.agent(
    llm_conn_id="pydanticai_default",
    system_prompt="You are a research assistant.",
    toolsets=[FunctionToolset(tools=[search_tool, lookup_tool])],
)
def research(question: str) -> str:
    return question

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
agent=Agent(model, ...)llm_conn_id="..."Model from connection
Agent's system_promptsystem_prompt="..."Now a decorator param
Agent's tools=[...]toolsets=[FunctionToolset(tools=[...])]Preferred: gets automatic tool-call logging
Agent's tools=[...]agent_params={"tools": [...]}Also works, but no tool-call logging
Agent's output_typeoutput_type=MyModelNow a decorator param
(not available)durable=TrueStep-level caching (needs [common.ai] durable_cache_path)
(not available)enable_hitl_review=TrueIterative human review loop (see below)

Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor. Prefer toolsets over agent_params["tools"]: the operator wraps each toolset in a LoggingToolset, so every tool call appears in the task log with timing.

enable_hitl_review behavior: the task generates a first draft, then blocks until a human acts. The reviewer uses the HITL Review tab/extra link on the task instance (chat UI from the provider's auto-registered hitl_review plugin) to request changes (agent regenerates with the feedback in its message history) or approve. Constraints: requires a string prompt, incompatible with durable=True, and the final (possibly regenerated) output is what flows to XCom. Warn users that the Dag run waits indefinitely at this task unless hitl_timeout is set. For headless testing, the plugin exposes REST endpoints under /hitl-review: GET /sessions/find, POST /sessions/feedback, POST /sessions/approve, POST /sessions/reject (query params dag_id, task_id, run_id, map_index).

@task.embed (NO EQUIVALENT — three replacement options)

The new provider does NOT include an embed decorator. Pick the replacement based on what the project needs:

Option A (recommended): LlamaIndexEmbeddingOperator (0.4.0, [llamaindex] extra). Connection-based, one task embeds the whole document list, and with persist_dir the resulting vector index is persisted for retrieval (pairs with LlamaIndexRetrievalOperator):

from airflow.providers.common.ai.operators.llamaindex_embedding import LlamaIndexEmbeddingOperator

_embeddings = LlamaIndexEmbeddingOperator(
    task_id="create_embeddings",
    documents=[{"text": "...", "metadata": {"id": 1}}, ...],  # templated, accepts XComArg
    llm_conn_id="pydanticai_default",   # reuses the same connection (API key only)
    embed_model="text-embedding-3-small",
    persist_dir=f"{AIRFLOW_HOME}/include/my_index",  # optional; local path or s3://, gs://, ...
)

The operator returns {"chunks": [{"text", "metadata", "vector"}], ...}. Put a stable key into each document's metadata — it round-trips through chunking, so vectors can be mapped back to source records.

Option B: LlamaIndexHook for raw vectors (no operator, no persisted index). Shortest path when vectors go straight to a database:

@task
def create_embeddings(rows):
    from airflow.providers.common.ai.hooks.llamaindex import LlamaIndexHook
    embed_model = LlamaIndexHook(
        llm_conn_id="pydanticai_default",
        embed_model="text-embedding-3-small",
    ).get_embedding_model()
    vectors = embed_model.get_text_embedding_batch([r["text"] for r in rows])
    return list(zip([r["id"] for r in rows], vectors))

Option C: plain @task with sentence-transformers (keeps the old local/offline behavior, no API cost; requires keeping sentence-transformers + torch in requirements):

@task
def embed_texts(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts, normalize_embeddings=True).tolist()

Note on dimensions: switching from all-MiniLM-L6-v2 (384) to text-embedding-3-small (1536) changes vector size — existing stored embeddings must be regenerated, and fixed-size vector columns (e.g. pgvector vector(384)) need a schema change. Embed all texts in one task/batch call rather than .expand() per text: batching is one API round-trip and avoids per-task model loading.


Step 4: Update imports

Old importNew import
import airflow_ai_sdk as ai_sdkRemove entirely
from airflow_ai_sdk import BaseModelfrom pydantic import BaseModel
from airflow_ai_sdk.models.base import BaseModelfrom pydantic import BaseModel
class Foo(ai_sdk.BaseModel):class Foo(BaseModel):
from pydantic_ai import AgentRemove if Agent was only used for @task.agent
from pydantic_ai.models.openai import OpenAIModelRemove (model config in connection now)
(new)from pydantic_ai.toolsets import FunctionToolset for @task.agent toolsets

The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.

pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim (>= 1.71.0 for provider 0.4.0).


Step 5: Update connections.yaml (if used for local testing)

pydanticai_default:
  conn_type: pydanticai
  password: <api-key>
  extra:
    model: "openai:gpt-5"

For custom endpoints:

pydanticai_cortex:
  conn_type: pydanticai
  password: <api-key>
  host: https://my-endpoint.com/v1
  extra:
    model: "openai:llama3.1-8b"

Step 6: Clean up env vars

The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:

OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY

Candidates for removal only if no other code references them:

  • OPENAI_API_KEY (now in the pydanticai connection's password field)
  • OPENAI_BASE_URL (now in the connection's host field)
  • Custom model name vars (now in the connection's extra.model)

If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.

Keep AIRFLOW_CONN_* env vars for all connections.


Step 7: Verify

After migration, grep the codebase to confirm no stale references remain:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models

Verify:

  • No imports from airflow_ai_sdk
  • No Agent() objects created for @task.agent (unless used outside decorators)
  • No model= parameter on LLM decorators (should be llm_conn_id=)
  • All @task.embed replaced (LlamaIndex operator/hook or plain @task); stored embeddings regenerated if the model/dimensions changed
  • Vision translation functions return [text, BinaryContent(...)] again if they were string-only-restricted under common-ai 0.1.x
  • Downstream consumers of output_type=BaseModel results use the XCom shape that actually arrives (dict on older cores, instance on newer; serialize_output=True pins it)
  • pydanticai connection configured in .env or connections.yaml
  • requirements.txt has apache-airflow-providers-common-ai[...] instead of airflow-ai-sdk[...]; torch/sentence-transformers removed if no longer used
  • Run the Dags end-to-end: tasks with enable_hitl_review=True or require_approval=True wait for human input, so the test plan must include acting on them (UI tab or /hitl-review REST)

Quick reference: New features in common-ai provider

These features are available after migration but have no airflow-ai-sdk equivalent:

FeatureParameter / APISinceDescription
HITL approvalrequire_approval=True on @task.llm0.1.0Pause for human review before returning
HITL review loopenable_hitl_review=True on @task.agent0.1.0Iterative review with regeneration (chat UI via hitl_review plugin)
Durable executiondurable=True on @task.agent0.1.0Step-level caching for resilience
Tool loggingenable_tool_logging=True on @task.agent0.1.0INFO-level tool call logs (default: on; requires toolsets)
Model overridemodel_id="openai:gpt-5"0.1.0Override connection's model per-task
File analysis@task.llm_file_analysis0.1.0Analyze files/images via ObjectStoragePath
NL-to-SQL@task.llm_sql0.1.0Generate SQL from natural language
Multimodal promptsTranslation function returns Sequence[UserContent]0.4.0Vision and other binary content in @task.llm / @task.agent / @task.llm_branch
Pydantic instance via XComoutput_type=BaseModel (with serialize_output opt-out)0.4.0Instance flows through XCom on capable cores; dict fallback otherwise
EmbeddingsLlamaIndexEmbeddingOperator (+ persist_dir)0.4.0Connection-based embeddings + persisted vector index
RetrievalLlamaIndexRetrievalOperator0.4.0Top-k similarity search over a persisted index

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official