migrating-ai-sdk-to-common-aioleh astronomer

Migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai 0.1.0+. Use this skill when the user wants to replace airflow-ai-sdk with…

npx skills add https://github.com/astronomer/agents --skill migrating-ai-sdk-to-common-ai

Migrate airflow-ai-sdk to apache-airflow-providers-common-ai

This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (0.1.0+), the official Airflow AI provider built on PydanticAI.

CRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (pydanticai type). There is no @task.embed in the new provider.

Before starting

Use the Grep tool with the pattern below to inventory everything that needs to migrate:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed

From the results, capture:

  1. All files importing airflow-ai-sdk / airflow_ai_sdk
  2. Which decorators are in use: @task.llm, @task.agent, @task.llm_branch, @task.embed
  3. The model configuration pattern (string names like "gpt-5", or OpenAIModel(...) objects)
  4. Any airflow_ai_sdk.BaseModel subclasses used as output_type

Use this inventory to drive the steps below.


Step 1: Update requirements.txt

Remove:

airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.

Add:

apache-airflow-providers-common-ai[openai]>=0.1.0

Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: [openai], [anthropic], [google], [bedrock], [groq], [mistral], [mcp].

Keep sentence-transformers and torch if the project uses embeddings (they now run via plain @task instead of @task.embed).


Step 2: Create PydanticAI connection

The new provider uses an Airflow connection instead of model strings or objects in code.

Connection type: pydanticai Default connection ID: pydanticai_default

Via environment variable (.env)

AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "extra": {
        "model": "<provider>:<model-name>"
    }
}'

Model format

The model field uses provider:model format:

ProviderExample model value
OpenAIopenai:gpt-5
Anthropicanthropic:claude-sonnet-4-20250514
Googlegoogle:gemini-2.5-pro
Groqgroq:llama-3.3-70b-versatile
Mistralmistral:mistral-large-latest
Bedrockbedrock:us.anthropic.claude-sonnet-4-20250514-v1:0

Custom endpoints (Ollama, vLLM, Snowflake Cortex, etc.)

Set host to the base URL:

AIRFLOW_CONN_PYDANTICAI_CORTEX='{
    "conn_type": "pydanticai",
    "password": "<api-key>",
    "host": "https://my-endpoint.com/v1",
    "extra": {
        "model": "openai:<model-name>"
    }
}'

Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.

Connection ID convention

The env var name determines the connection ID:

  • AIRFLOW_CONN_PYDANTICAI_DEFAULT creates pydanticai_default
  • AIRFLOW_CONN_PYDANTICAI_CORTEX creates pydanticai_cortex

Model resolution priority

  1. model_id parameter on the decorator/operator (highest)
  2. model in connection's extra JSON (fallback)

Step 3: Migrate decorators

@task.llm

# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk

class MyOutput(ai_sdk.BaseModel):
    field: str

@task.llm(
    model="gpt-5",                    # or model=OpenAIModel(...)
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel

class MyOutput(BaseModel):
    field: str

@task.llm(
    llm_conn_id="pydanticai_default",  # Airflow connection ID
    system_prompt="You are helpful.",
    output_type=MyOutput,
)
def my_task(text: str) -> str:
    return text

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
model="gpt-5"llm_conn_id="pydanticai_default"Model specified in connection
model=OpenAIModel(...)llm_conn_id="pydanticai_default"Model + endpoint in connection
system_prompt="..."system_prompt="..."Unchanged
output_type=MyModeloutput_type=MyModelUnchanged
result_type=MyModeloutput_type=MyModelresult_type was already deprecated
(not available)model_id="openai:gpt-5"Override connection's model
(not available)require_approval=TrueBuilt-in HITL review
(not available)agent_params={...}Extra kwargs for pydantic-ai Agent

@task.llm_branch

# BEFORE
@task.llm_branch(
    model="gpt-5",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,
)
def route(text: str) -> str:
    return text

# AFTER
@task.llm_branch(
    llm_conn_id="pydanticai_default",
    system_prompt="Choose a team...",
    allow_multiple_branches=False,    # same parameter, unchanged
)
def route(text: str) -> str:
    return text

Only change: model= becomes llm_conn_id=.

@task.agent

This has the biggest API change. The Agent is no longer pre-built in user code.

# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent

my_agent = Agent(
    "gpt-5",
    system_prompt="You are a research assistant.",
    tools=[search_tool, lookup_tool],
)

@task.agent(agent=my_agent)
def research(question: str) -> str:
    return question

# AFTER (common-ai provider) - No Agent object, config via parameters
@task.agent(
    llm_conn_id="pydanticai_default",
    system_prompt="You are a research assistant.",
    agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
    return question

Parameter mapping:

airflow-ai-sdkcommon-ai providerNotes
agent=Agent(model, ...)llm_conn_id="..."Model from connection
Agent's system_promptsystem_prompt="..."Now a decorator param
Agent's tools=[...]agent_params={"tools": [...]}Tools via agent_params dict
Agent's output_typeoutput_type=MyModelNow a decorator param
(not available)toolsets=[...]pydantic-ai 1.x Toolset objects
(not available)durable=TrueStep-level caching
(not available)enable_hitl_review=TrueIterative human review loop

Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor.

@task.embed (NO EQUIVALENT)

The new provider does NOT include an embed decorator. Replace with a plain @task:

# BEFORE (airflow-ai-sdk)
@task.embed(
    model_name="all-MiniLM-L6-v2",
    encode_kwargs={"normalize_embeddings": True},
    max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
    return text

# AFTER (plain @task with sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(text, normalize_embeddings=True).tolist()

Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using .expand().


Step 4: Update imports

Old importNew import
import airflow_ai_sdk as ai_sdkRemove entirely
from airflow_ai_sdk import BaseModelfrom pydantic import BaseModel
from airflow_ai_sdk.models.base import BaseModelfrom pydantic import BaseModel
class Foo(ai_sdk.BaseModel):class Foo(BaseModel):
from pydantic_ai import AgentRemove if Agent was only used for @task.agent
from pydantic_ai.models.openai import OpenAIModelRemove (model config in connection now)

The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.

pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim>=1.34.0.


Step 5: Update connections.yaml (if used for local testing)

pydanticai_default:
  conn_type: pydanticai
  password: <api-key>
  extra:
    model: "openai:gpt-5"

For custom endpoints:

pydanticai_cortex:
  conn_type: pydanticai
  password: <api-key>
  host: https://my-endpoint.com/v1
  extra:
    model: "openai:llama3.1-8b"

Step 6: Clean up env vars

The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:

OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY

Candidates for removal only if no other code references them:

  • OPENAI_API_KEY (now in the pydanticai connection's password field)
  • OPENAI_BASE_URL (now in the connection's host field)
  • Custom model name vars (now in the connection's extra.model)

If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.

Keep AIRFLOW_CONN_* env vars for all connections.


Step 7: Verify

After migration, grep the codebase to confirm no stale references remain:

airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models

Verify:

  • No imports from airflow_ai_sdk
  • No Agent() objects created for @task.agent (unless used outside decorators)
  • No model= parameter on LLM decorators (should be llm_conn_id=)
  • All @task.embed replaced with plain @task
  • pydanticai connection configured in .env or connections.yaml
  • requirements.txt has apache-airflow-providers-common-ai[...] instead of airflow-ai-sdk[...]

Quick reference: New features in common-ai provider

These features are available after migration but have no airflow-ai-sdk equivalent:

FeatureParameterDescription
HITL approvalrequire_approval=True on @task.llmPause for human review before returning
HITL review loopenable_hitl_review=True on @task.agentIterative review with regeneration
Durable executiondurable=True on @task.agentStep-level caching for resilience
Tool loggingenable_tool_logging=True on @task.agentINFO-level tool call logs (default: on)
Model overridemodel_id="openai:gpt-5"Override connection's model per-task
File analysis@task.llm_file_analysisAnalyze files/images via ObjectStoragePath
NL-to-SQL@task.llm_sqlGenerate SQL from natural language

Lebih banyak skill dari astronomer

airflow
by astronomer
Query, manage, and troubleshoot Apache Airflow DAGs, runs, tasks, and system configuration. Supports 30+ commands across DAG inspection, run management, task logging, configuration queries, and direct REST API access Manage multiple Airflow instances with persistent configuration; auto-discover local and Astro deployments Trigger DAG runs synchronously (wait for completion) or asynchronously, diagnose failures, clear runs for retry, and access task logs with retry/map-index filtering Output...
airflow-hitl
by astronomer
Human approval gates, form inputs, and branching in Airflow DAGs using deferrable operators. Four operator types: ApprovalOperator for approve/reject decisions, HITLOperator for multi-option selection with forms, HITLBranchOperator for human-driven task routing, and HITLEntryOperator for form data collection All operators are deferrable, releasing worker slots while awaiting human response via Airflow UI's Required Actions tab or REST API Supports optional features including custom...
airflow-plugins
by astronomer
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use…
analyzing-data
by astronomer
Query your data warehouse to answer business questions with cached patterns and concept mappings. Supports pattern lookup and caching for repeated question types, with outcome recording to improve future queries Includes concept-to-table mapping cache and table schema discovery via INFORMATION_SCHEMA or codebase grep Provides run_sql() and run_sql_pandas() kernel functions returning Polars or Pandas DataFrames for analysis CLI commands for managing concept, pattern, and table caches, plus...
annotating-task-lineage
by astronomer
Annotate Airflow tasks with data lineage using inlets and outlets. Supports OpenLineage Dataset objects, Airflow Assets, and Airflow Datasets for defining inputs and outputs across databases, data warehouses, and cloud storage Use as a fallback when operators lack built-in OpenLineage extractors; follows a four-tier precedence system where custom extractors and OpenLineage methods take priority Includes dataset naming helpers for Snowflake, BigQuery, S3, and PostgreSQL to ensure consistent...
authoring-dags
by astronomer
Guided workflow for creating Apache Airflow DAGs with validation and testing integration. Structured six-phase approach: discover environment and existing patterns, plan DAG structure, implement following best practices, validate with af CLI commands, test with user consent, and iterate on fixes CLI commands for discovery ( af config connections , af config providers , af dags list ) and validation ( af dags errors , af dags get , af dags explore ) provide immediate feedback on DAG...
blueprint
by astronomer
Define reusable Airflow task group templates with Pydantic validation and compose DAGs from YAML. Use when creating blueprint templates, composing DAGs from…
checking-freshness
by astronomer
Verify data freshness by checking table timestamps and update patterns against a staleness scale. Identifies timestamp columns using common ETL naming patterns ( _loaded_at , _updated_at , created_at , etc.) and queries their maximum values to determine age Classifies data into four freshness statuses: Fresh (< 4 hours), Stale (4–24 hours), Very Stale (> 24 hours), or Unknown (no timestamp found) Provides SQL templates for checking last update time and row count trends over recent days to...

NotebookLM Web Importer

Impor halaman web dan video YouTube ke NotebookLM dengan satu klik. Dipercaya oleh 200.000+ pengguna.

Instal Ekstensi Chrome