migrating-ai-sdk-to-common-aiby astronomer
Migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai 0.1.0+. Use this skill when the user wants to replace airflow-ai-sdk with…
npx skills add https://github.com/astronomer/agents --skill migrating-ai-sdk-to-common-aiMigrate airflow-ai-sdk to apache-airflow-providers-common-ai
This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (0.1.0+), the official Airflow AI provider built on PydanticAI.
CRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (
pydanticaitype). There is no@task.embedin the new provider.
Before starting
Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
From the results, capture:
- All files importing
airflow-ai-sdk/airflow_ai_sdk - Which decorators are in use:
@task.llm,@task.agent,@task.llm_branch,@task.embed - The model configuration pattern (string names like
"gpt-5", orOpenAIModel(...)objects) - Any
airflow_ai_sdk.BaseModelsubclasses used asoutput_type
Use this inventory to drive the steps below.
Step 1: Update requirements.txt
Remove:
airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
Add:
apache-airflow-providers-common-ai[openai]>=0.1.0
Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: [openai], [anthropic], [google], [bedrock], [groq], [mistral], [mcp].
Keep sentence-transformers and torch if the project uses embeddings (they now run via plain @task instead of @task.embed).
Step 2: Create PydanticAI connection
The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type: pydanticai
Default connection ID: pydanticai_default
Via environment variable (.env)
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'
Model format
The model field uses provider:model format:
| Provider | Example model value |
|---|---|
| OpenAI | openai:gpt-5 |
| Anthropic | anthropic:claude-sonnet-4-20250514 |
google:gemini-2.5-pro | |
| Groq | groq:llama-3.3-70b-versatile |
| Mistral | mistral:mistral-large-latest |
| Bedrock | bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0 |
Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)
Set host to the base URL:
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'
Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.
Connection ID convention
The env var name determines the connection ID:
AIRFLOW_CONN_PYDANTICAI_DEFAULTcreatespydanticai_defaultAIRFLOW_CONN_PYDANTICAI_CORTEXcreatespydanticai_cortex
Model resolution priority
model_idparameter on the decorator/operator (highest)modelin connection's extra JSON (fallback)
Step 3: Migrate decorators
@task.llm
# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # or model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow connection ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
model="gpt-5" | llm_conn_id="pydanticai_default" | Model specified in connection |
model=OpenAIModel(...) | llm_conn_id="pydanticai_default" | Model + endpoint in connection |
system_prompt="..." | system_prompt="..." | Unchanged |
output_type=MyModel | output_type=MyModel | Unchanged |
result_type=MyModel | output_type=MyModel | result_type was already deprecated |
| (not available) | model_id="openai:gpt-5" | Override connection's model |
| (not available) | require_approval=True | Built-in HITL review |
| (not available) | agent_params={...} | Extra kwargs for pydantic-ai Agent |
@task.llm_branch
# BEFORE
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
# AFTER
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # same parameter, unchanged
)
def route(text: str) -> str:
return text
Only change: model= becomes llm_conn_id=.
@task.agent
This has the biggest API change. The Agent is no longer pre-built in user code.
# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
# AFTER (common-ai provider) - No Agent object, config via parameters
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
agent=Agent(model, ...) | llm_conn_id="..." | Model from connection |
Agent's system_prompt | system_prompt="..." | Now a decorator param |
Agent's tools=[...] | agent_params={"tools": [...]} | Tools via agent_params dict |
Agent's output_type | output_type=MyModel | Now a decorator param |
| (not available) | toolsets=[...] | pydantic-ai 1.x Toolset objects |
| (not available) | durable=True | Step-level caching |
| (not available) | enable_hitl_review=True | Iterative human review loop |
Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor.
@task.embed (NO EQUIVALENT)
The new provider does NOT include an embed decorator. Replace with a plain @task:
# BEFORE (airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
# AFTER (plain @task with sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using .expand().
Step 4: Update imports
| Old import | New import |
|---|---|
import airflow_ai_sdk as ai_sdk | Remove entirely |
from airflow_ai_sdk import BaseModel | from pydantic import BaseModel |
from airflow_ai_sdk.models.base import BaseModel | from pydantic import BaseModel |
class Foo(ai_sdk.BaseModel): | class Foo(BaseModel): |
from pydantic_ai import Agent | Remove if Agent was only used for @task.agent |
from pydantic_ai.models.openai import OpenAIModel | Remove (model config in connection now) |
The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.
pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim>=1.34.0.
Step 5: Update connections.yaml (if used for local testing)
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"
For custom endpoints:
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"
Step 6: Clean up env vars
The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidates for removal only if no other code references them:
OPENAI_API_KEY(now in the pydanticai connection's password field)OPENAI_BASE_URL(now in the connection's host field)- Custom model name vars (now in the connection's extra.model)
If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.
Keep AIRFLOW_CONN_* env vars for all connections.
Step 7: Verify
After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Verify:
- No imports from
airflow_ai_sdk - No
Agent()objects created for@task.agent(unless used outside decorators) - No
model=parameter on LLM decorators (should bellm_conn_id=) - All
@task.embedreplaced with plain@task -
pydanticaiconnection configured in.envor connections.yaml -
requirements.txthasapache-airflow-providers-common-ai[...]instead ofairflow-ai-sdk[...]
Quick reference: New features in common-ai provider
These features are available after migration but have no airflow-ai-sdk equivalent:
| Feature | Parameter | Description |
|---|---|---|
| HITL approval | require_approval=True on @task.llm | Pause for human review before returning |
| HITL review loop | enable_hitl_review=True on @task.agent | Iterative review with regeneration |
| Durable execution | durable=True on @task.agent | Step-level caching for resilience |
| Tool logging | enable_tool_logging=True on @task.agent | INFO-level tool call logs (default: on) |
| Model override | model_id="openai:gpt-5" | Override connection's model per-task |
| File analysis | @task.llm_file_analysis | Analyze files/images via ObjectStoragePath |
| NL-to-SQL | @task.llm_sql | Generate SQL from natural language |