aipor vercel

Python `ai` module — models, agents, hooks, middleware, MCP, structured output

npx skills add https://github.com/vercel-labs/py-ai --skill ai

ai

Use this skill when working with the Python ai SDK.

uv add ai

Direct OpenAI-compatible and Anthropic-compatible providers require optional extras: uv add "ai[openai]" or uv add "ai[anthropic]". AI Gateway works with the base package.

import ai

Quick start

import asyncio
import ai


@ai.tool
async def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Sunny, 72F in {city}"


async def main() -> None:
    model = ai.get_model("gateway:anthropic/claude-sonnet-4")
    agent = ai.agent(tools=[get_weather])

    messages = [
        ai.system_message("You are a helpful weather assistant."),
        ai.user_message("What's the weather in Tokyo?"),
    ]

    async with agent.run(model, messages) as stream:
        async for event in stream:
            if isinstance(event, ai.events.TextDelta):
                print(event.chunk, end="", flush=True)

    print(stream.output)


if __name__ == "__main__":
    asyncio.run(main())

ai.stream(...) and agent.run(...) are async context managers. Iterate events inside the context. After iteration, read final state from the stream object.

Models and providers

model = ai.get_model()  # reads AI_SDK_DEFAULT_MODEL
model = ai.get_model("anthropic/claude-sonnet-4")  # unprefixed: gateway route
model = ai.get_model("gateway:anthropic/claude-sonnet-4")
model = ai.get_model("openai:gpt-5.4")  # direct provider route
model = ai.get_model("anthropic:claude-sonnet-4-6")
  • Gateway credentials use AI_GATEWAY_API_KEY.
  • Direct providers use provider-specific env vars such as OPENAI_API_KEY and ANTHROPIC_API_KEY.
  • Use ai.get_provider(...) when you need a custom base URL, API key, headers, or client.
  • Use await ai.probe(model) to check credentials and model availability.
provider = ai.get_provider(
    "openai",
    base_url="http://localhost:1234/v1",
    api_key="your_access_token_here",
)
model = ai.Model("local-model", provider=provider)

models = await ai.get_provider("anthropic").list_models()

Request-scoped provider options go through params:

params = {
    "providerOptions": {
        "gateway": {"sort": "cost"},
        "anthropic": {"speed": "fast"},
    }
}

async with ai.stream(model, messages, params=params) as stream:
    async for event in stream:
        ...

Messages and events

Messages are Pydantic models with typed parts. Use builders for common roles and parts:

ai.system_message("Be concise.")
ai.user_message("Describe this image:", ai.file_part(image_bytes, media_type="image/png"))
ai.assistant_message(ai.thinking("scratchpad"), "Final answer")
ai.tool_result_part("tc-1", result={"temp": 72}, tool_name="get_weather")
ai.tool_message(tool_call_id="tc-1", result=72, tool_name="get_weather")

Common message properties:

  • message.text, message.reasoning.
  • message.tool_calls, message.tool_results.
  • message.builtin_tool_calls, message.builtin_tool_returns.
  • message.files, message.images, message.videos.
  • message.get_output() or message.get_output(MyModel).

Streams and agents yield event objects from ai.events:

async with ai.stream(model, messages, tools=tools) as stream:
    async for event in stream:
        if isinstance(event, ai.events.TextDelta):
            print(event.chunk, end="", flush=True)
        elif isinstance(event, ai.events.ToolEnd):
            print(event.tool_call.tool_name, event.tool_call.tool_args)
        elif isinstance(event, ai.events.ToolCallResult):
            for result in event.results:
                print(result.tool_name, result.result)
        elif isinstance(event, ai.events.HookEvent):
            print(event.hook.hook_id, event.hook.status)
        elif isinstance(event, ai.events.PartialToolCallResult):
            print(event.label, event.value)

After iteration:

stream.message      # final assistant message for ai.stream
stream.messages     # updated agent history for agent.run
stream.text         # text output for ai.stream
stream.output       # text or parsed Pydantic output
stream.tool_calls   # function tool calls from ai.stream
stream.usage        # latest reported usage

Serialize and restore history with Pydantic JSON:

encoded = [message.model_dump(mode="json") for message in stream.messages]
restored = [ai.messages.Message.model_validate(item) for item in encoded]

Direct streaming

Use ai.stream when you want one model response and will handle any function tool calls yourself:

async with ai.stream(model, messages, tools=[get_weather.tool]) as stream:
    async for event in stream:
        if isinstance(event, ai.events.TextDelta):
            print(event.chunk, end="", flush=True)

for call in stream.tool_calls:
    print(call.tool_name, call.tool_args)

Use structured output with a Pydantic model:

import pydantic


class Forecast(pydantic.BaseModel):
    city: str
    temperature: float


async with ai.stream(model, messages, output_type=Forecast) as stream:
    async for event in stream:
        ...

forecast = stream.output

Tools

A function tool is an async Python function decorated with @ai.tool. The function name becomes the tool name, the docstring becomes the description, and the signature becomes a Pydantic-validated JSON schema.

@ai.tool
async def scan_sector(sector: str, depth: int = 1) -> str:
    """Scan a sector at the requested depth."""
    return f"{sector}: clear at depth {depth}"

Use schema-only tools with ai.stream when the SDK should not execute them:

tool = ai.Tool(
    kind="function",
    name="get_weather",
    args=ai.tools.FunctionToolArgs(
        description="Get current weather for a city.",
        params={
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"],
        },
    ),
)

Provider-executed tools run outside your process:

tools = [ai.providers.anthropic.tools.web_search(max_uses=3)]

async with ai.stream(model, messages, tools=tools) as stream:
    async for event in stream:
        if isinstance(event, ai.events.BuiltinToolResult):
            print(event.result.tool_name, event.result.result)

Tool validation failures and exceptions become ToolCallResult events with error result parts. The original exception is on event.exception for logging.

if isinstance(event, ai.events.ToolCallResult) and event.exception:
    log_exception(event.exception)

Streaming tools

Async-generator tools yield partial values while they run. An aggregator turns those values into the final tool result the model sees.

@ai.tool
async def draft_reply(topic: str) -> ai.StreamingTextTool:
    """Draft a reply."""
    yield "Checking "
    yield f"records for {topic}."
@ai.tool
async def fetch(url: str) -> ai.StreamingStatusTool[str]:
    """Fetch a URL with status updates."""
    yield "connecting"
    yield "downloading"
    yield body  # last yield is the tool result
@ai.tool
async def research(topic: str) -> ai.SubAgentTool:
    """Research a topic with a subagent."""
    subagent = ai.agent(tools=[...])
    async with subagent.run(model, [ai.user_message(topic)]) as stream:
        async for event in stream:
            yield event

For custom aggregation, annotate an async-generator return type with Annotated[AsyncGenerator[T], ai.agents.Aggregate(...)]. Built-in aggregators: ai.agents.ConcatAggregator, ai.agents.LastAggregator, and ai.agents.MessageAggregator.

Agents

Use an agent when the SDK should execute Python tools, append tool results, and continue until the assistant returns a final answer.

agent = ai.agent(tools=[get_weather])

async with agent.run(model, messages) as stream:
    async for event in stream:
        if isinstance(event, ai.events.TextDelta):
            print(event.chunk, end="", flush=True)

history = stream.messages
answer = stream.output

Pass structured output and provider params through agent.run:

async with agent.run(
    model,
    [ai.user_message("Return a JSON forecast.")],
    output_type=Forecast,
    params={"temperature": 0},
) as stream:
    async for event in stream:
        ...

forecast = stream.output

Custom agent loops

Subclass ai.Agent and override loop for custom scheduling, routing, logging, persistence, or approval logic.

from collections.abc import AsyncGenerator


class CustomAgent(ai.Agent):
    async def loop(self, context: ai.Context) -> AsyncGenerator[ai.events.AgentEvent]:
        while context.keep_running():
            async with (
                ai.stream(context=context) as stream,
                ai.ToolRunner() as tool_runner,
            ):
                async for event in ai.util.merge(stream, tool_runner.events()):
                    yield event

                    if isinstance(event, ai.events.ToolEnd):
                        tool_call = context.resolve(event.tool_call)
                        tool_runner.schedule(tool_call)

                context.add(stream.message)
                context.add(tool_runner.get_tool_message())

Loop helpers: context.model, context.messages, context.tools, context.output_type, context.params, context.resolve(...), context.keep_running(), and context.add(...).

Multi-agent

Use ai.SubAgentTool for agent-as-tool workflows. Use ai.yield_from(...) inside custom loops to fan out streams and forward nested events as PartialToolCallResult values with labels.

async with (
    researcher.run(model, research_messages) as research_stream,
    analyst.run(model, analyst_messages) as analyst_stream,
):
    research_text, analyst_text = await asyncio.gather(
        ai.yield_from(
            research_stream,
            label="researcher",
            aggregator=ai.agents.MessageAggregator,
        ),
        ai.yield_from(
            analyst_stream,
            label="analyst",
            aggregator=ai.agents.MessageAggregator,
        ),
    )

Route labels in the consumer:

if isinstance(event, ai.events.PartialToolCallResult):
    if event.label == "researcher":
        route_research(event.value)

Hooks

Hooks are runtime suspension points. Tool approvals are the built-in workflow.

@ai.tool(require_approval=True)
async def delete_file(path: str) -> str:
    """Delete a file."""
    ...

The default loop gates each call behind an approval hook with label approve_{tool_call_id} and payload ai.tools.ToolApproval.

async with agent.run(model, messages) as stream:
    async for event in stream:
        if isinstance(event, ai.events.HookEvent) and event.hook.status == "pending":
            ai.resolve_hook(
                event.hook.hook_id,
                ai.tools.ToolApproval(granted=True, reason="approved"),
            )

Resolve with granted=False to deny the call and return an error tool result.

Manual hooks block until resolved in live flows:

approval = await ai.hook(
    "approve_send_email",
    payload=ai.tools.ToolApproval,
    metadata={"tool": "send_email"},
)

Resolve or cancel from another task, request handler, or UI callback:

ai.resolve_hook("approve_send_email", {"granted": True, "reason": "approved"})
await ai.cancel_hook("approve_send_email", reason="client disconnected")

Hooks emit HookEvent objects. Their messages use role="internal" and contain HookPart values.

Serverless resume flow:

async with agent.run(model, messages) as stream:
    async for event in stream:
        if isinstance(event, ai.events.HookEvent) and event.hook.status == "pending":
            ai.abort_pending_hook(event.hook)
        yield event

persist(stream.messages)

# Later, restore messages, pre-register the resolution, and rerun.
ai.resolve_hook(hook_id, ai.tools.ToolApproval(granted=True, reason="approved"))

MCP

MCP adapters return AgentTool objects usable in ai.agent(...).

tools = await ai.mcp.get_http_tools(
    "https://mcp.example.com/mcp",
    headers={"Authorization": "Bearer token"},
    tool_prefix="docs",
)

tools = await ai.mcp.get_stdio_tools(
    "npx",
    "-y",
    "@anthropic/mcp-server-filesystem",
    "/tmp",
    tool_prefix="fs",
)

agent = ai.agent(tools=tools)

AI SDK UI adapter

Use ai.agents.ui.ai_sdk to convert between AI SDK UI messages and Python runtime messages/events.

class ChatRequest(pydantic.BaseModel):
    messages: list[ai.agents.ui.ai_sdk.UIMessage]


@app.post("/chat")
async def chat(request: ChatRequest):
    messages, approvals = ai.agents.ui.ai_sdk.to_messages(request.messages)
    ai.agents.ui.ai_sdk.apply_approvals(approvals)

    async def stream_response():
        async with chat_agent.run(model, messages) as stream:
            async for chunk in ai.agents.ui.ai_sdk.to_sse(stream):
                yield chunk

    return fastapi.responses.StreamingResponse(
        stream_response(),
        headers=ai.agents.ui.ai_sdk.UI_MESSAGE_STREAM_HEADERS,
    )

Use ai.agents.ui.ai_sdk.to_ui_messages(messages) to rebuild UI history from stored runtime messages.

For serverless approvals, monitor HookEvent before passing events to to_sse and call ai.abort_pending_hook(event.hook) on pending hooks.

Media generation

Use ai.generate for dedicated image and video models:

image_message = await ai.generate(
    ai.get_model("gateway:google/imagen-4.0-generate-001"),
    [ai.user_message("A watercolor mothership over a quiet city.")],
    ai.ImageParams(n=1, aspect_ratio="16:9"),
)

image = image_message.images[0]

For video generation, pass ai.VideoParams(...) and read message.videos.

NotebookLM Web Importer

Importa páginas web y videos de YouTube a NotebookLM con un clic. Utilizado por más de 200,000 usuarios.

Instalar extensión de Chrome