migrating-ai-sdk-to-common-ai

Di chuyển các dự án Airflow từ airflow-ai-sdk sang apache-airflow-providers-common-ai 0.1.0+. Sử dụng kỹ năng này khi người dùng muốn thay thế airflow-ai-sdk bằng…

npx skills add https://github.com/astronomer/agents --skill migrating-ai-sdk-to-common-ai

Migrate airflow-ai-sdk to apache-airflow-providers-common-ai

This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (target 0.4.0+), the official Airflow AI provider built on PydanticAI. It also covers upgrading projects already on common-ai 0.1.x, since several capabilities (multimodal prompts, toolsets, embedding operators, structured-output XCom behavior) changed between 0.1.0 and 0.4.0.

CRITICAL: The new provider requires Airflow 3.0+ and (for 0.4.0) pydantic-ai-slim >= 1.71.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (pydanticai type). There is no @task.embed in the new provider; embeddings move to the LlamaIndex integration or a plain @task (see Step 3).

Before starting

Use the Grep tool with the pattern below to inventory everything that needs to migrate:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed

From the results, capture:

  1. All files importing airflow-ai-sdk / airflow_ai_sdk
  2. Which decorators are in use: @task.llm, @task.agent, @task.llm_branch, @task.embed
  3. The model configuration pattern (string names like "gpt-5", or OpenAIModel(...) objects)
  4. Any airflow_ai_sdk.BaseModel subclasses used as output_type

Use this inventory to drive the steps below.


Step 1: Update requirements.txt

Remove:

airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.

Add:

apache-airflow-providers-common-ai[openai]>=0.4.0

Use the latest available 0.x version unless the user has pinned a specific one. Available extras (0.4.0): [openai], [anthropic], [google], [bedrock], [llamaindex], [langchain], [mcp], plus file-format extras ([pdf], [docx], [parquet], [avro]) for DocumentLoaderOperator and [sql]/[common-sql] for the SQL operators. There are no [groq]/[mistral] extras; for those providers install the matching pydantic-ai-slim extra yourself.

Add [llamaindex] if the project migrates @task.embed to the LlamaIndexEmbeddingOperator (recommended, see Step 3). In that case sentence-transformers and torch can usually be removed, which shrinks the image considerably. Keep them only if the project stays on local sentence-transformers embeddings via plain @task.


Step 2: Create PydanticAI connection

The new provider uses an Airflow connection instead of model strings or objects in code.

Connection type: pydanticai Default connection ID: pydanticai_default

Via environment variable (.env)

AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "extra": {
        "model": "<provider>:<model-name>"
    }
}'

Model format

The model field uses provider:model format:

ProviderExample model value
OpenAIopenai:gpt-5
Anthropicanthropic:claude-sonnet-4-20250514
Googlegoogle:gemini-2.5-pro
Groqgroq:llama-3.3-70b-versatile
Mistralmistral:mistral-large-latest
Bedrockbedrock:us.anthropic.claude-sonnet-4-20250514-v1:0

Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)

Set host to the base URL:

AIRFLOW_CONN_PYDANTICAI_CORTEX='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "host": "https://my-endpoint.com/v1",
    "extra": {
        "model": "openai:<model-name>"
    }
}'

Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.

Connection ID convention

The env var name determines the connection ID:

  • AIRFLOW_CONN_PYDANTICAI_DEFAULT creates pydanticai_default
  • AIRFLOW_CONN_PYDANTICAI_CORTEX creates pydanticai_cortex

Model resolution priority

  1. model_id parameter on the decorator/operator (highest)
  2. model in connection's extra JSON (fallback)

Other connection types (0.4.0)

Besides pydanticai, the provider registers vendor-specific connection types: pydanticai-azure (Azure OpenAI: host = endpoint, extra api_version), pydanticai-bedrock (AWS credentials/region in extra), and pydanticai-vertex (GCP project/location in extra). The LlamaIndex and LangChain hooks read API key/host/extra from whatever connection ID they are given, so a single pydanticai_default connection can serve LLM calls and embeddings: one API key entry for the whole project.


Step 3: Migrate decorators

@task.llm

# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk

class MyOutput(ai_sdk.BaseModel):
    field: str

@task.llm(
    model="gpt-5",                    # or model=OpenAIModel(...)
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel

class MyOutput(BaseModel):
    field: str

@task.llm(
    llm_conn_id="pydanticai_default",  # Airflow connection ID
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
model="gpt-5"llm_conn_id="pydanticai_default"Model specified in connection
model=OpenAIModel(...)llm_conn_id="pydanticai_default"Model + endpoint in connection
system_prompt="..."system_prompt="..."Unchanged
output_type=MyModeloutput_type=MyModelUnchanged
result_type=MyModeloutput_type=MyModelresult_type was already deprecated
(not available)model_id="openai:gpt-5"Override connection's model
(not available)require_approval=TrueBuilt-in HITL review
(not available)agent_params={...}Extra kwargs for pydantic-ai Agent
(not available)serialize_output=TrueForce dict shape for BaseModel output

Multimodal prompts (0.4.0+): the translation function may return a Sequence[UserContent] instead of a string, e.g. for vision:

@task.llm(llm_conn_id="pydanticai_default", system_prompt="...", output_type=ReviewAnalysis)
def analyze(text: str, image_path: str | None = None):
    if image_path:
        with open(image_path, "rb") as f:
            return [text, BinaryContent(data=f.read(), media_type="image/jpeg")]
    return text

This matches the old airflow-ai-sdk vision pattern, so vision code migrates unchanged. Note: common-ai 0.1.x only accepted strings — if a project disabled vision to migrate to 0.1.0, re-enable it when bumping to 0.4.0. Non-string prompts are incompatible with require_approval=True / enable_hitl_review=True (both render the prompt as text).

Structured output via XCom (0.4.0 behavior change): with output_type=<BaseModel subclass>, the model instance flows through XCom on Airflow cores whose task SDK has SUPPORTS_OPERATOR_DESERIALIZATION_WALKER (attribute access downstream); on older cores (including Astro Runtime 3.2 task SDK 1.2.x) the provider automatically dumps to a dict (subscript access). Check which shape arrives at runtime before choosing attribute vs dict access downstream, or set serialize_output=True to force the dict shape everywhere. The output_type class must be defined at module scope (nested classes cannot be deserialized from XCom).

@task.llm_branch

# BEFORE
@task.llm_branch(
    model="gpt-5",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,
)
def route(text: str) -> str:
    return text

# AFTER
@task.llm_branch(
    llm_conn_id="pydanticai_default",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,    # same parameter, unchanged
)
def route(text: str) -> str:
    return text

Only change: model= becomes llm_conn_id=.

@task.agent

This has the biggest API change. The Agent is no longer pre-built in user code.

# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent

my_agent = Agent(
    "gpt-5",
    system_prompt="You are a research assistant.",
    tools=[search_tool, lookup_tool],
)

@task.agent(agent=my_agent)
def research(question: str) -> str:
    return question

# AFTER (common-ai provider) - No Agent object, config via parameters
from pydantic_ai.toolsets import FunctionToolset

@task.agent(
    llm_conn_id="pydanticai_default",
    system_prompt="You are a research assistant.",
    toolsets=[FunctionToolset(tools=[search_tool, lookup_tool])],
)
def research(question: str) -> str:
    return question

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
agent=Agent(model, ...)llm_conn_id="..."Model from connection
Agent's system_promptsystem_prompt="..."Now a decorator param
Agent's tools=[...]toolsets=[FunctionToolset(tools=[...])]Preferred: gets automatic tool-call logging
Agent's tools=[...]agent_params={"tools": [...]}Also works, but no tool-call logging
Agent's output_typeoutput_type=MyModelNow a decorator param
(not available)durable=TrueStep-level caching (needs [common.ai] durable_cache_path)
(not available)enable_hitl_review=TrueIterative human review loop (see below)

Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor. Prefer toolsets over agent_params["tools"]: the operator wraps each toolset in a LoggingToolset, so every tool call appears in the task log with timing.

enable_hitl_review behavior: the task generates a first draft, then blocks until a human acts. The reviewer uses the HITL Review tab/extra link on the task instance (chat UI from the provider's auto-registered hitl_review plugin) to request changes (agent regenerates with the feedback in its message history) or approve. Constraints: requires a string prompt, incompatible with durable=True, and the final (possibly regenerated) output is what flows to XCom. Warn users that the Dag run waits indefinitely at this task unless hitl_timeout is set. For headless testing, the plugin exposes REST endpoints under /hitl-review: GET /sessions/find, POST /sessions/feedback, POST /sessions/approve, POST /sessions/reject (query params dag_id, task_id, run_id, map_index).

@task.embed (NO EQUIVALENT — three replacement options)

The new provider does NOT include an embed decorator. Pick the replacement based on what the project needs:

Option A (recommended): LlamaIndexEmbeddingOperator (0.4.0, [llamaindex] extra). Connection-based, one task embeds the whole document list, and with persist_dir the resulting vector index is persisted for retrieval (pairs with LlamaIndexRetrievalOperator):

from airflow.providers.common.ai.operators.llamaindex_embedding import LlamaIndexEmbeddingOperator

_embeddings = LlamaIndexEmbeddingOperator(
    task_id="create_embeddings",
    documents=[{"text": "...", "metadata": {"id": 1}}, ...],  # templated, accepts XComArg
    llm_conn_id="pydanticai_default",   # reuses the same connection (API key only)
    embed_model="text-embedding-3-small",
    persist_dir=f"{AIRFLOW_HOME}/include/my_index",  # optional; local path or s3://, gs://, ...
)

The operator returns {"chunks": [{"text", "metadata", "vector"}], ...}. Put a stable key into each document's metadata — it round-trips through chunking, so vectors can be mapped back to source records.

Option B: LlamaIndexHook for raw vectors (no operator, no persisted index). Shortest path when vectors go straight to a database:

@task
def create_embeddings(rows):
    from airflow.providers.common.ai.hooks.llamaindex import LlamaIndexHook
    embed_model = LlamaIndexHook(
        llm_conn_id="pydanticai_default",
        embed_model="text-embedding-3-small",
    ).get_embedding_model()
    vectors = embed_model.get_text_embedding_batch([r["text"] for r in rows])
    return list(zip([r["id"] for r in rows], vectors))

Option C: plain @task with sentence-transformers (keeps the old local/offline behavior, no API cost; requires keeping sentence-transformers + torch in requirements):

@task
def embed_texts(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts, normalize_embeddings=True).tolist()

Note on dimensions: switching from all-MiniLM-L6-v2 (384) to text-embedding-3-small (1536) changes vector size — existing stored embeddings must be regenerated, and fixed-size vector columns (e.g. pgvector vector(384)) need a schema change. Embed all texts in one task/batch call rather than .expand() per text: batching is one API round-trip and avoids per-task model loading.


Step 4: Update imports

Old importNew import
import airflow_ai_sdk as ai_sdkRemove entirely
from airflow_ai_sdk import BaseModelfrom pydantic import BaseModel
from airflow_ai_sdk.models.base import BaseModelfrom pydantic import BaseModel
class Foo(ai_sdk.BaseModel):class Foo(BaseModel):
from pydantic_ai import AgentRemove if Agent was only used for @task.agent
from pydantic_ai.models.openai import OpenAIModelRemove (model config in connection now)
(new)from pydantic_ai.toolsets import FunctionToolset for @task.agent toolsets

The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.

pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim (>= 1.71.0 for provider 0.4.0).


Step 5: Update connections.yaml (if used for local testing)

pydanticai_default:
  conn_type: pydanticai
  password: <api-key>
  extra:
    model: "openai:gpt-5"

For custom endpoints:

pydanticai_cortex:
  conn_type: pydanticai
  password: <api-key>
  host: https://my-endpoint.com/v1
  extra:
    model: "openai:llama3.1-8b"

Step 6: Clean up env vars

The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:

OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY

Candidates for removal only if no other code references them:

  • OPENAI_API_KEY (now in the pydanticai connection's password field)
  • OPENAI_BASE_URL (now in the connection's host field)
  • Custom model name vars (now in the connection's extra.model)

If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.

Keep AIRFLOW_CONN_* env vars for all connections.


Step 7: Verify

After migration, grep the codebase to confirm no stale references remain:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models

Verify:

  • No imports from airflow_ai_sdk
  • No Agent() objects created for @task.agent (unless used outside decorators)
  • No model= parameter on LLM decorators (should be llm_conn_id=)
  • All @task.embed replaced (LlamaIndex operator/hook or plain @task); stored embeddings regenerated if the model/dimensions changed
  • Vision translation functions return [text, BinaryContent(...)] again if they were string-only-restricted under common-ai 0.1.x
  • Downstream consumers of output_type=BaseModel results use the XCom shape that actually arrives (dict on older cores, instance on newer; serialize_output=True pins it)
  • pydanticai connection configured in .env or connections.yaml
  • requirements.txt has apache-airflow-providers-common-ai[...] instead of airflow-ai-sdk[...]; torch/sentence-transformers removed if no longer used
  • Run the Dags end-to-end: tasks with enable_hitl_review=True or require_approval=True wait for human input, so the test plan must include acting on them (UI tab or /hitl-review REST)

Quick reference: New features in common-ai provider

These features are available after migration but have no airflow-ai-sdk equivalent:

FeatureParameter / APISinceDescription
HITL approvalrequire_approval=True on @task.llm0.1.0Pause for human review before returning
HITL review loopenable_hitl_review=True on @task.agent0.1.0Iterative review with regeneration (chat UI via hitl_review plugin)
Durable executiondurable=True on @task.agent0.1.0Step-level caching for resilience
Tool loggingenable_tool_logging=True on @task.agent0.1.0INFO-level tool call logs (default: on; requires toolsets)
Model overridemodel_id="openai:gpt-5"0.1.0Override connection's model per-task
File analysis@task.llm_file_analysis0.1.0Analyze files/images via ObjectStoragePath
NL-to-SQL@task.llm_sql0.1.0Generate SQL from natural language
Multimodal promptsTranslation function returns Sequence[UserContent]0.4.0Vision and other binary content in @task.llm / @task.agent / @task.llm_branch
Pydantic instance via XComoutput_type=BaseModel (with serialize_output opt-out)0.4.0Instance flows through XCom on capable cores; dict fallback otherwise
EmbeddingsLlamaIndexEmbeddingOperator (+ persist_dir)0.4.0Connection-based embeddings + persisted vector index
RetrievalLlamaIndexRetrievalOperator0.4.0Top-k similarity search over a persisted index

Thêm skills từ astronomer

airflow
astronomer
Truy vấn, quản lý và khắc phục sự cố DAG, lần chạy, tác vụ và cấu hình hệ thống Apache Airflow. Hỗ trợ hơn 30 lệnh bao gồm kiểm tra DAG, quản lý lần chạy, ghi nhật ký tác vụ, truy vấn cấu hình và truy cập trực tiếp REST API. Quản lý nhiều phiên bản Airflow với cấu hình liên tục; tự động phát hiện triển khai cục bộ và Astro. Kích hoạt chạy DAG đồng bộ (chờ hoàn thành) hoặc không đồng bộ, chẩn đoán lỗi, xóa lần chạy để thử lại, và truy cập nhật ký tác vụ với bộ lọc thử lại/ch
official
airflow-hitl
astronomer
Cổng phê duyệt của con người, đầu vào biểu mẫu và phân nhánh trong DAG Airflow sử dụng các toán tử có thể trì hoãn. Bốn loại toán tử: ApprovalOperator cho quyết định phê duyệt/từ chối, HITLOperator cho lựa chọn nhiều tùy chọn với biểu mẫu, HITLBranchOperator cho định tuyến tác vụ do con người điều khiển và HITLEntryOperator cho thu thập dữ liệu biểu mẫu. Tất cả các toán tử đều có thể trì hoãn, giải phóng slot worker trong khi chờ phản hồi của con người qua tab Required Actions của giao diện Airflow hoặc REST API. Hỗ trợ các tính năng tùy chọn bao gồm tùy chỉnh...
official
airflow-plugins
astronomer
Xây dựng plugin Airflow 3.1+ nhúng ứng dụng FastAPI, trang UI tùy chỉnh, thành phần React, middleware, macro và liên kết toán tử trực tiếp vào giao diện Airflow. Sử dụng…
official
analyzing-data
astronomer
Truy vấn kho dữ liệu của bạn để trả lời các câu hỏi kinh doanh với các mẫu đã lưu trong bộ nhớ đệm và ánh xạ khái niệm. Hỗ trợ tra cứu mẫu và lưu vào bộ nhớ đệm cho các loại câu hỏi lặp lại, với ghi nhận kết quả để cải thiện các truy vấn trong tương lai. Bao gồm bộ nhớ đệm ánh xạ khái niệm sang bảng và khám phá lược đồ bảng qua INFORMATION_SCHEMA hoặc tìm kiếm trong mã nguồn. Cung cấp các hàm kernel run_sql() và run_sql_pandas() trả về DataFrame Polars hoặc Pandas để phân tích. Các lệnh CLI để quản lý bộ nhớ đệm khái
official
annotating-task-lineage
astronomer
Chú thích các tác vụ Airflow với dòng dữ liệu (data lineage) bằng cách sử dụng inlets và outlets. Hỗ trợ các đối tượng Dataset của OpenLineage, Assets của Airflow và Datasets của Airflow để xác định đầu vào và đầu ra trên các cơ sở dữ liệu, kho dữ liệu và lưu trữ đám mây. Sử dụng như phương án dự phòng khi các toán tử thiếu bộ trích xuất OpenLineage tích hợp sẵn; tuân theo hệ thống ưu tiên bốn cấp, trong đó các bộ trích xuất tùy chỉnh và phương thức OpenLineage được ưu tiên. Bao gồm các trình trợ giúp đặt tên dataset cho Snowflake, BigQuery, S3 và PostgreSQL để đảm bảo tính nhất quán...
official
authoring-dags
astronomer
Quy trình làm việc có hướng dẫn để tạo DAG Apache Airflow với tích hợp xác thực và kiểm thử. Phương pháp sáu giai đoạn có cấu trúc: khám phá môi trường và các mẫu hiện có, lập kế hoạch cấu trúc DAG, triển khai theo các phương pháp tốt nhất, xác thực bằng lệnh CLI af, kiểm thử với sự đồng ý của người dùng, và lặp lại các bước sửa lỗi. Các lệnh CLI để khám phá (af config connections, af config providers, af dags list) và xác thực (af dags errors, af dags get, af dags explore) cung cấp phản hồi tức thì về DAG...
official
blueprint
astronomer
Xác định các mẫu nhóm tác vụ Airflow có thể tái sử dụng với xác thực Pydantic và soạn DAG từ YAML. Sử dụng khi tạo mẫu blueprint, soạn DAG từ…
official
checking-freshness
astronomer
Kiểm tra độ tươi mới của dữ liệu bằng cách đối chiếu dấu thời gian bảng và mẫu cập nhật với thang đo độ cũ. Xác định các cột dấu thời gian sử dụng các mẫu đặt tên ETL phổ biến (_loaded_at, _updated_at, created_at, v.v.) và truy vấn giá trị tối đa của chúng để xác định tuổi. Phân loại dữ liệu thành bốn trạng thái độ tươi mới: Tươi (< 4 giờ), Cũ (4–24 giờ), Rất cũ (> 24 giờ) hoặc Không xác định (không tìm thấy dấu thời gian). Cung cấp các mẫu SQL để kiểm tra thời gian cập nhật cuối cùng và xu hướng số lượng hàng trong những ngày gần đây để...
official