MCPOmni Connect
A universal command-line interface (CLI) gateway to the MCP ecosystem, integrating multiple MCP servers, AI models, and transport protocols.
๐ฌ See It In Action
import asyncio
from omnicoreagent import OmniCoreAgent, MemoryRouter, ToolRegistry
# Create tools in seconds
tools = ToolRegistry()
@tools.register_tool("get_weather")
def get_weather(city: str) -> dict:
"""Get current weather for a city."""
return {"city": city, "temp": "22ยฐC", "condition": "Sunny"}
# Build a production-ready agent
agent = OmniCoreAgent(
name="assistant",
system_instruction="You are a helpful assistant with access to weather data.",
model_config={"provider": "openai", "model": "gpt-4o"},
local_tools=tools,
memory_router=MemoryRouter("redis"), # Start with Redis
agent_config={
"context_management": {"enabled": True}, # Auto-manage long conversations
"guardrail_config": {"strict_mode": True}, # Block prompt injections
}
)
async def main():
# Run the agent
result = await agent.run("What's the weather in Tokyo?")
print(result["response"])
# Switch to MongoDB at runtime โ no restart needed
await agent.switch_memory_store("mongodb")
# Keep running with a different backend
result = await agent.run("How about Paris?")
print(result["response"])
asyncio.run(main())
What just happened?
- โ Registered a custom tool with type hints
- โ Built an agent with memory persistence
- โ Enabled automatic context management
- โ Switched from Redis to MongoDB while running
โก Quick Start
pip install omnicoreagent
echo "LLM_API_KEY=your_api_key" > .env
from omnicoreagent import OmniCoreAgent
agent = OmniCoreAgent(
name="my_agent",
system_instruction="You are a helpful assistant.",
model_config={"provider": "openai", "model": "gpt-4o"}
)
result = await agent.run("Hello!")
print(result["response"])
That's it. You have an AI agent with session management, memory, and error handling.
๐ Want to learn more? Check out the Cookbook โ progressive examples from "Hello World" to production deployments.
๐ฏ What Makes OmniCoreAgent Different?
| Feature | What It Means For You |
|---|---|
| Runtime Backend Switching | Switch Redis โ MongoDB โ PostgreSQL without restarting |
| Cloud Workspace Storage | Agent files persist in AWS S3 or Cloudflare R2 โก NEW |
| Context Engineering | Session memory + agent loop context + tool offloading = no token exhaustion |
| Tool Response Offloading | Large tool outputs saved to files, 98% token savings |
| Built-in Guardrails | Prompt injection protection out of the box |
| MCP Native | Connect to any MCP server (stdio, SSE, HTTP with OAuth) |
| Background Agents | Schedule autonomous tasks that run on intervals |
| Workflow Orchestration | Sequential, Parallel, and Router agents for complex tasks |
| Production Observability | Metrics, tracing, and event streaming built in |
Getting Started: See It In Action โข Quick Start
Core Features: OmniCoreAgent โข Memory System โข Context Engineering โข Event System โข MCP Client โข Deep Agent โข Local Tools โข Agent Skills โข Workspace Memory (S3/R2/Local)
Multi-Agent: Sub-Agents โข Background Agents โข Workflows
Production: BM25 Tool Retrieval โข Observability โข Guardrails โข Model Support โข OmniServe
Reference: Examples โข Configuration โข Testing โข Contributing
๐ Architecture Overview
OmniCoreAgent follows a layered architecture that separates concerns while enabling rich integrations between components. Here's how it all fits together:
flowchart TB
%% Entry Points
subgraph Entry["๐ Entry Points"]
User([๐ค User / App])
Clock([โฐ Scheduler])
API([๐ External API])
end
%% Security Layer - First Line of Defense
subgraph Security["๐ก๏ธ Security Layer"]
Guard["Guardrails Engine<br/>โโโโโโโโโโโโโโ<br/>โข Pattern Matching<br/>โข Heuristic Analysis<br/>โข Entropy Detection<br/>โข Encoding Detection"]
end
%% Orchestration Layer
subgraph Orchestration["๐ผ Orchestration Layer"]
direction LR
WF["Workflow Engine"]
SEQ["Sequential<br/>Agent"]
PAR["Parallel<br/>Agent"]
RTR["Router<br/>Agent"]
BG["Background<br/>Agent"]
WF --> SEQ
WF --> PAR
WF --> RTR
end
%% Core Engine - The Heart
subgraph Core["๐ง OmniCore Engine"]
direction TB
OCA["OmniCoreAgent<br/>โโโโโโโโโโโโโโ<br/>โข System Instructions<br/>โข Model Config<br/>โข Agent Config"]
subgraph Processing["Processing Pipeline"]
direction LR
CTX["Context<br/>Manager"]
SUM["Conversation<br/>Summarizer"]
REACT["ReAct<br/>Loop"]
OFF["Tool Response<br/>Offloader"]
end
LLM["LLM Layer<br/>โโโโโโโโโโโโโโ<br/>OpenAI โข Anthropic<br/>Gemini โข Groq โข Ollama<br/>Mistral โข DeepSeek"]
end
%% Capabilities Layer
subgraph Capabilities["๐ ๏ธ Capabilities Layer"]
direction TB
TOOLS["Tool Orchestration<br/>โโโโโโโโโโโโโโ<br/>โข Validation<br/>โข Execution<br/>โข Error Handling"]
subgraph ToolTypes["Tool Types"]
direction LR
LOCAL["Local Tools<br/>(ToolRegistry)"]
MCP["MCP Client<br/>(stdio/http/sse)"]
BM25["BM25 RAG<br/>(Dynamic Discovery)"]
SKILLS["Agent Skills<br/>(Polyglot Scripts)"]
SUB["Sub-Agents<br/>(Delegation)"]
end
end
%% Infrastructure Layer
subgraph Infrastructure["๐พ Infrastructure Layer"]
direction TB
subgraph Routers["Routers (Hot-Swappable)"]
direction LR
MEM["Memory Router<br/>โโโโโโโโโโโโโโ<br/>Session State<br/>Conversation History"]
EVT["Event Router<br/>โโโโโโโโโโโโโโ<br/>Real-time Streaming<br/>Audit Trail"]
end
subgraph Storage["Storage Backends"]
direction LR
REDIS[("Redis")]
POSTGRES[("PostgreSQL")]
MONGO[("MongoDB")]
MEM_STORE[("In-Memory")]
FS[("File System")]
end
OBS["Observability<br/>โโโโโโโโโโโโโโ<br/>Metrics โข Tracing<br/>Token Usage โข Opik"]
end
%% Connections - Main Flow
User --> Guard
API --> Guard
Clock --> BG
Guard -->|"โ Safe"| OCA
Guard -->|"โ Blocked"| User
BG --> OCA
WF --> OCA
SEQ --> OCA
PAR --> OCA
RTR --> OCA
OCA --> CTX
CTX <--> SUM
CTX --> REACT
REACT <--> LLM
REACT --> TOOLS
TOOLS --> OFF
TOOLS --> LOCAL
TOOLS --> MCP
TOOLS --> BM25
TOOLS --> SKILLS
TOOLS --> SUB
SUB -.->|"Recursive"| OCA
%% Infrastructure Connections
OCA <-.->|"State"| MEM
OCA <-.->|"Events"| EVT
OFF <-.->|"Artifacts"| FS
MEM --> REDIS
MEM --> POSTGRES
MEM --> MONGO
MEM --> MEM_STORE
EVT --> REDIS
OCA <-.->|"Metrics"| OBS
%% Styling
style Entry fill:#1abc9c,stroke:#16a085,color:#fff
style Security fill:#e74c3c,stroke:#c0392b,color:#fff
style Orchestration fill:#d35400,stroke:#e67e22,color:#fff
style Core fill:#2c3e50,stroke:#34495e,color:#fff
style Processing fill:#34495e,stroke:#2c3e50,color:#fff
style Capabilities fill:#2980b9,stroke:#3498db,color:#fff
style ToolTypes fill:#3498db,stroke:#2980b9,color:#fff
style Infrastructure fill:#8e44ad,stroke:#9b59b6,color:#fff
style Routers fill:#9b59b6,stroke:#8e44ad,color:#fff
style Storage fill:#95a5a6,stroke:#7f8c8d,color:#fff
Layer Responsibilities
| Layer | Purpose | Key Components |
|---|---|---|
| ๐ Entry | Request sources | User Apps, Schedulers, External APIs |
| ๐ก๏ธ Security | Threat protection | Guardrails (injection detection, encoding checks) |
| ๐ผ Orchestration | Multi-agent coordination | Sequential, Parallel, Router, Background agents |
| ๐ง Core Engine | Agent execution | ReAct loop, Context Management, Summarization, LLM calls |
| ๐ ๏ธ Capabilities | Tool execution | Local tools, MCP, BM25 discovery, Skills, Sub-agents |
| ๐พ Infrastructure | Persistence & observability | Memory/Event routers, Storage backends, Metrics |
Data Flow Highlights
- Request Path: Entry โ Security (guardrails) โ Core โ Capabilities โ Response
- Memory Persistence: State flows bidirectionally between agent and storage backends
- Tool Offloading: Large responses saved to file system, only previews in context
- Event Streaming: Real-time events pushed to Redis Streams for monitoring
๐ฏ Core Features
1. ๐ค OmniCoreAgent โ The Heart of the Framework
from omnicoreagent import OmniCoreAgent, ToolRegistry, MemoryRouter, EventRouter
# Basic Agent
agent = OmniCoreAgent(
name="assistant",
system_instruction="You are a helpful assistant.",
model_config={"provider": "openai", "model": "gpt-4o"}
)
# Production Agent with All Features
agent = OmniCoreAgent(
name="production_agent",
system_instruction="You are a production agent.",
model_config={"provider": "openai", "model": "gpt-4o"},
local_tools=tool_registry,
mcp_tools=[...],
memory_router=MemoryRouter("redis"),
event_router=EventRouter("redis_stream"),
agent_config={
"max_steps": 20,
"enable_advanced_tool_use": True,
"enable_agent_skills": True,
"memory_tool_backend": "local",
# Memory with summarization
"memory_config": {
"mode": "sliding_window",
"value": 10,
"summary": {
"enabled": True,
"retention_policy": "summarize",
},
},
# Context management for long conversations
"context_management": {
"enabled": True,
"mode": "token_budget",
"value": 100000,
"threshold_percent": 75,
"strategy": "summarize_and_truncate",
"preserve_recent": 6,
},
# Prompt injection guardrails
"guardrail_config": {
"enabled": True,
"strict_mode": True,
},
},
)
# Key Methods
await agent.run(query) # Execute task
await agent.run(query, session_id="user_1") # With session context
await agent.connect_mcp_servers() # Connect MCP tools
await agent.list_all_available_tools() # List all tools
await agent.switch_memory_store("mongodb") # Switch backend at runtime!
await agent.get_session_history(session_id) # Retrieve conversation history
await agent.clear_session_history(session_id) # Clear history (session_id optional, clears all if None)
await agent.get_events(session_id) # Get event history
await agent.get_memory_store_type() # Get current memory router type
await agent.cleanup() # Clean up resources and remove the agent and the config
await agent.cleanup_mcp_servers() # Clean up MCP servers without removing the agent and the config
await agent.get_metrics() # Get cumulative usage (tokens, requests, time)
[!TIP] Each
agent.run()call now returns ametricfield containing fine-grained usage for that specific request.
๐ก When to Use: OmniCoreAgent is your go-to for any AI task โ from simple Q&A to complex multi-step workflows. Start here for any agent project.
2. ๐ง Multi-Tier Memory System (Plug & Play)
5 backends with runtime switching โ start with Redis, switch to MongoDB, then PostgreSQL โ all on the fly!
from omnicoreagent import OmniCoreAgent, MemoryRouter
# Start with Redis
agent = OmniCoreAgent(
name="my_agent",
memory_router=MemoryRouter("redis"),
model_config={"provider": "openai", "model": "gpt-4o"}
)
# Switch at runtime โ no restart needed!
agent.swith_memory_store("mongodb") # Switch to MongoDB
agent.swith_memory_store("database") # Switch to PostgreSQL/MySQL/SQLite
agent.swith_memory_store("in_memory") # Switch to in-memory
agent.swith_memory_store("redis") # Back to Redis
| Backend | Use Case | Environment Variable |
|---|---|---|
in_memory | Fast development | โ |
redis | Production persistence | REDIS_URL |
database | PostgreSQL/MySQL/SQLite | DATABASE_URL |
mongodb | Document storage | MONGODB_URI |
๐ง Conversation Summarization
OmniCoreAgent includes automatic conversation summarization to manage long conversation histories efficiently. When enabled, older messages are condensed into summaries, keeping context while reducing token usage.
from omnicoreagent import OmniCoreAgent, MemoryRouter
# Configure summarization with sliding window
memory_router = MemoryRouter(
store_type="redis",
memory_config={
"mode": "sliding_window", # or "token_budget"
"value": 10, # Keep last 10 messages (sliding_window) or max tokens (token_budget)
"summary": {
"enabled": True,
"retention_policy": "keep" # Options: "keep" or "delete"
}
}
)
agent = OmniCoreAgent(
name="summarizing_agent",
memory_router=memory_router,
model_config={"provider": "openai", "model": "gpt-4o"}
)
Summarization Modes:
| Mode | Description | Best For |
|---|---|---|
sliding_window | Keep last N messages, summarize older ones | Predictable memory size |
token_budget | Keep messages within token limit | Cost optimization |
Retention Policies:
| Policy | Behavior |
|---|---|
keep | Mark summarized messages as inactive (recoverable) |
delete | Permanently remove summarized messages |
How It Works:
- When conversation exceeds configured limit โ summarization triggers
- Older messages are sent to LLM for summary generation
- Summary replaces older messages in active context
- Original messages are retained (with
"keep") or deleted per policy
๐ก When to Use: Enable summarization for long-running conversations (support bots, research assistants) to maintain context while controlling costs. Use
sliding_windowfor predictable behavior,token_budgetfor strict cost control.
3. ๐ Context Engineering System
OmniCoreAgent implements state-of-the-art context engineering inspired by patterns from Anthropic and Cursor. This dual-layer approach ensures your agents never hit token limits โ even during marathon coding sessions or multi-step research tasks.
flowchart TB
subgraph Input["๐ฅ Incoming Context"]
MSG["Messages<br/>(User + Assistant + Tool)"]
TOOL_RESP["Tool Responses<br/>(Web Search, APIs, Files)"]
end
subgraph Layer1["๐ง Layer 1: Agent Loop Context Management"]
direction TB
MONITOR["Context Monitor<br/>โโโโโโโโโโโโโโ<br/>โข Token counting<br/>โข Message counting"]
subgraph Modes["Management Modes"]
direction LR
TOKEN["token_budget<br/>โโโโโโโโโโ<br/>Max total tokens"]
SLIDE["sliding_window<br/>โโโโโโโโโโ<br/>Max message count"]
end
subgraph Strategies["Overflow Strategies"]
direction LR
TRUNC["truncate<br/>โโโโโโโโ<br/>Drop oldest<br/>(fast)"]
SUMTRUNC["summarize_and_truncate<br/>โโโโโโโโโโโโโโ<br/>Condense โ Drop<br/>(preserves context)"]
end
RECENT["preserve_recent<br/>โโโโโโโโโโโโโโ<br/>Always keep last N<br/>messages protected"]
end
subgraph Layer2["๐พ Layer 2: Tool Response Offloading"]
direction TB
CHECK["Size Check<br/>โโโโโโโโโโโโโโ<br/>threshold_tokens: 500<br/>threshold_bytes: 2000"]
subgraph Offload["Offload Process"]
direction LR
SAVE["Save to File<br/>(.omnicoreagent_artifacts/)"]
PREVIEW["Generate Preview<br/>(first ~150 tokens)"]
end
subgraph Tools["Built-in Artifact Tools"]
direction LR
READ["read_artifact()"]
TAIL["tail_artifact()"]
SEARCH["search_artifact()"]
LIST["list_artifacts()"]
end
end
subgraph Output["๐ค Optimized Context"]
CLEAN["Lean Context<br/>โโโโโโโโโโโโโโ<br/>โข System prompt<br/>โข Recent messages<br/>โข Summaries<br/>โข Tool previews"]
end
%% Flow
MSG --> MONITOR
TOOL_RESP --> CHECK
MONITOR --> TOKEN
MONITOR --> SLIDE
TOKEN --> TRUNC
TOKEN --> SUMTRUNC
SLIDE --> TRUNC
SLIDE --> SUMTRUNC
TRUNC --> RECENT
SUMTRUNC --> RECENT
CHECK -->|"> threshold"| SAVE
CHECK -->|"โค threshold"| Output
SAVE --> PREVIEW
PREVIEW --> Tools
Tools -.->|"On demand"| Output
RECENT --> CLEAN
PREVIEW --> CLEAN
%% Styling
style Input fill:#3498db,stroke:#2980b9,color:#fff
style Layer1 fill:#2c3e50,stroke:#34495e,color:#fff
style Modes fill:#34495e,stroke:#2c3e50,color:#fff
style Strategies fill:#34495e,stroke:#2c3e50,color:#fff
style Layer2 fill:#8e44ad,stroke:#9b59b6,color:#fff
style Offload fill:#9b59b6,stroke:#8e44ad,color:#fff
style Tools fill:#9b59b6,stroke:#8e44ad,color:#fff
style Output fill:#27ae60,stroke:#2ecc71,color:#fff
How the Two Layers Work Together
| Layer | Scope | What It Manages | When It Triggers |
|---|---|---|---|
| Context Management | Agent loop messages | User/Assistant conversation, tool call history | When context exceeds threshold_percent of limit |
| Tool Offloading | Individual tool responses | Large API responses, file contents, search results | When response exceeds threshold_tokens |
3.1 Agent Loop Context Management
Prevent token exhaustion during long-running tasks with automatic context management. When enabled, the agent monitors context size and applies truncation or summarization when thresholds are exceeded.
agent_config = {
"context_management": {
"enabled": True,
"mode": "token_budget", # or "sliding_window"
"value": 100000, # Max tokens (token_budget) or max messages (sliding_window)
"threshold_percent": 75, # Trigger at 75% of limit
"strategy": "summarize_and_truncate", # or "truncate"
"preserve_recent": 4, # Always keep last N messages
}
}
Modes:
| Mode | Description | Best For |
|---|---|---|
token_budget | Manage by total token count | Cost control, API limits |
sliding_window | Manage by message count | Predictable context size |
Strategies:
| Strategy | Behavior | Trade-off |
|---|---|---|
truncate | Drop oldest messages | Fast, no extra LLM calls |
summarize_and_truncate | Summarize then drop | Preserves context, adds latency |
3.2 Tool Response Offloading
Large tool responses are automatically saved to files, with only a preview in context. The agent can retrieve full content on demand using built-in tools.
agent_config = {
"tool_offload": {
"enabled": True,
"threshold_tokens": 500, # Offload responses > 500 tokens
"max_preview_tokens": 150, # Show first 150 tokens in context
"storage_dir": ".omnicoreagent_artifacts"
}
}
Token Savings Example:
| Tool Response | Without Offloading | With Offloading | Savings |
|---|---|---|---|
| Web search (50 results) | ~10,000 tokens | ~200 tokens | 98% |
| Large API response | ~5,000 tokens | ~150 tokens | 97% |
| File read (1000 lines) | ~8,000 tokens | ~200 tokens | 97% |
Built-in Artifact Tools (automatically available when offloading is enabled):
| Tool | Purpose |
|---|---|
read_artifact(artifact_id) | Read full content when needed |
tail_artifact(artifact_id, lines) | Read last N lines (great for logs) |
search_artifact(artifact_id, query) | Search within large responses |
list_artifacts() | See all offloaded data in current session |
Combined Power
Enable both for maximum efficiency:
agent = OmniCoreAgent(
name="research_agent",
agent_config={
"context_management": {"enabled": True, "strategy": "summarize_and_truncate"},
"tool_offload": {"enabled": True, "threshold_tokens": 500}
}
)
# Result: Agents that can run indefinitely without token exhaustion
๐ก When to Use: Enable for long-running tasks (research, multi-step workflows) where context or tool responses can grow unbounded.
4. ๐ก Event System (Plug & Play)
Real-time event streaming with runtime switching:
from omnicoreagent import EventRouter
# Start with in-memory
agent = OmniCoreAgent(
event_router=EventRouter("in_memory"),
...
)
# Switch to Redis Streams for production
agent.switch_event_store("redis_stream")
agent.get_event_store_type() # Get current event router type
# Stream events in real-time
async for event in agent.stream_events(session_id):
print(f"{event.type}: {event.payload}")
Event Types: user_message, agent_message, tool_call_started, tool_call_result, final_answer, agent_thought, sub_agent_started, sub_agent_error, sub_agent_result
๐ก When to Use: Enable events when you need real-time monitoring, debugging, or building UIs that show agent progress. Essential for production observability.
5. ๐ Built-in MCP Client
Connect to any MCP-compatible service with support for multiple transport protocols and authentication methods.
Transport Types
1. stdio โ Local MCP servers (process communication)
{
"name": "filesystem",
"transport_type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/home"]
}
2. streamable_http โ Remote servers with HTTP streaming
# With Bearer Token
{
"name": "github",
"transport_type": "streamable_http",
"url": "http://localhost:8080/mcp",
"headers": {
"Authorization": "Bearer your-token" # optional
},
"timeout": 60 # optional
}
# With OAuth 2.0 (auto-starts callback server on localhost:3000)
{
"name": "oauth_server",
"transport_type": "streamable_http",
"auth": {
"method": "oauth"
},
"url": "http://localhost:8000/mcp"
}
3. sse โ Server-Sent Events
{
"name": "sse_server",
"transport_type": "sse",
"url": "http://localhost:3000/sse",
"headers": {
"Authorization": "Bearer token" # optional
},
"timeout": 60, # optional
"sse_read_timeout": 120 # optional
}
Complete Example with All 3 Transport Types
agent = OmniCoreAgent(
name="multi_mcp_agent",
system_instruction="You have access to filesystem, GitHub, and live data.",
model_config={"provider": "openai", "model": "gpt-4o"},
mcp_tools=[
# 1. stdio - Local filesystem
{
"name": "filesystem",
"transport_type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/home"]
},
# 2. streamable_http - Remote API (supports Bearer token or OAuth)
{
"name": "github",
"transport_type": "streamable_http",
"url": "http://localhost:8080/mcp",
"headers": {"Authorization": "Bearer github-token"},
"timeout": 60
},
# 3. sse - Real-time streaming
{
"name": "live_data",
"transport_type": "sse",
"url": "http://localhost:3000/sse",
"headers": {"Authorization": "Bearer token"},
"sse_read_timeout": 120
}
]
)
await agent.connect_mcp_servers()
tools = await agent.list_all_available_tools() # All MCP + local tools
result = await agent.run("List all Python files and get latest commits")
Transport Comparison
| Transport | Use Case | Auth Methods |
|---|---|---|
stdio | Local MCP servers, CLI tools | None (local process) |
streamable_http | Remote APIs, cloud services | Bearer token, OAuth 2.0 |
sse | Real-time data, streaming | Bearer token, custom headers |
๐ก When to Use: Use MCP when you need to connect to external tools and services. Choose
stdiofor local CLI tools,streamable_httpfor REST APIs, andssefor real-time streaming data.
6. ๐ง DeepAgent (Multi-Agent Orchestration)
DeepAgent = OmniCoreAgent + Multi-Agent Orchestration
DeepAgent automatically breaks down complex tasks and delegates them to specialized subagents running in parallel. The lead agent coordinates the work and synthesizes findings from memory.
โก Quick Start
from omnicoreagent import DeepAgent
# Create a DeepAgent for any domain
agent = DeepAgent(
name="research_coordinator",
system_instruction="You are a tech research coordinator.",
model_config={"provider": "openai", "model": "gpt-4o"},
)
await agent.initialize() # Required: Registers orchestration tools
# Run complex query โ automatically spawns subagents
result = await agent.run("""
Research the benefits of Rust vs Go for cloud-native applications.
Consider performance, developer experience, and ecosystem maturity.
""")
# DeepAgent spawns 3 parallel subagents:
# - Performance researcher
# - DevEx analyst
# - Ecosystem analyst
await agent.cleanup()
๐ง Configuration Options
| Config | Default | Description |
|---|---|---|
max_steps | 50 | Max reasoning steps (increase for complex orchestration) |
tool_call_timeout | 600 | Timeout per tool call in seconds (10 min for deep work) |
memory_tool_backend | "local" | Always "local" โ enforced for orchestration |
context_management.enabled | true | Auto-manage context window |
tool_offload.enabled | true | Offload large tool responses |
# Complete DeepAgent configuration
agent = DeepAgent(
name="ResearchCoordinator",
system_instruction="You are a strategic research analyst.",
model_config={"provider": "openai", "model": "gpt-4o"},
agent_config={
"max_steps": 100, # More steps for complex orchestration
"tool_call_timeout": 600, # 10 min timeout for subagents
# memory_tool_backend is ALWAYS "local" (enforced)
},
debug=True, # Enable debug logging
)
[!IMPORTANT]
memory_tool_backendis always"local"for DeepAgent. This cannot be overridden โ it's required for the memory-based orchestration workflow.
๐ ๏ธ Built-in Orchestration Tools
DeepAgent automatically provides these tools to the lead agent:
| Tool | Purpose | Parameters |
|---|---|---|
spawn_subagent | Spawn a single focused subagent | name, instruction, task |
spawn_parallel_subagents | Spawn multiple subagents in parallel | [{name, instruction, task}, ...] |
Subagents inherit from parent:
- Model configuration
- MCP and local tools
- Agent config (context management, tool offload, etc.)
Example: Manual subagent spawning (usually automatic):
# The lead agent can explicitly spawn subagents via tools:
# 1. Single subagent
spawn_subagent(
name="market_researcher",
instruction="You are a market research specialist.",
task="Research AI DevOps market size and growth trends"
)
# 2. Parallel subagents
spawn_parallel_subagents([
{"name": "tech_analyst", "instruction": "...", "task": "Analyze technology trends"},
{"name": "competitor_analyst", "instruction": "...", "task": "Map competitor landscape"},
{"name": "pricing_analyst", "instruction": "...", "task": "Research pricing models"},
])
๐ RPI+ Workflow (Advanced Orchestration)
For complex tasks, DeepAgent implements the RPI+ workflow:
1. Meta-Assessment โ Evaluate task complexity before choosing strategy
2. Research โ Broad landscape exploration
3. Plan โ Strategic decomposition with quality gates
4. Implement โ Parallel subagent execution
5. Verify โ Gap analysis + confidence scoring
6. Iterate โ Surgical refinement when thresholds not met
7. Synthesize โ Cross-cutting insights with source citations
Architecture Flow:
User Query โ Lead Agent โ Spawn Subagents (parallel)
โ
[Subagent A] โ Write to /memories/subtask_a/
[Subagent B] โ Write to /memories/subtask_b/
[Subagent C] โ Write to /memories/subtask_c/
โ
Lead Agent reads memory โ Synthesize โ Final Answer
Why Memory-First?
- Survives context resets
- Enables true parallel execution
- No context bloat from intermediate results
๐ DeepAgent with MCP Tools
from omnicoreagent import DeepAgent
# DeepAgent with Tavily web search
agent = DeepAgent(
name="MarketResearcher",
system_instruction="You are a strategic market research analyst.",
model_config={"provider": "gemini", "model": "gemini-2.5-pro"},
mcp_tools=[
{
"name": "tavily",
"transport_type": "stdio",
"command": "npx",
"args": ["-y", "mcp-remote", f"https://mcp.tavily.com/mcp/?tavilyApiKey={TAVILY_KEY}"],
}
],
agent_config={
"max_steps": 100,
},
)
await agent.initialize()
result = await agent.run("Comprehensive market analysis of AI DevOps tools in 2026")
await agent.cleanup()
DeepAgent vs OmniCoreAgent
| Feature | OmniCoreAgent | DeepAgent |
|---|---|---|
| Domain | User-defined | User-defined (same) |
| Tools | User-provided | User-provided + orchestration |
| Memory Backend | Optional | Always "local" (enforced) |
| Orchestration | No | Automatic subagent spawning |
| Best For | Single-agent tasks | Complex multi-step analysis |
When to use DeepAgent:
- โ Multi-domain research (tech + market + legal)
- โ Parallel analysis (compare multiple options)
- โ Complex synthesis (aggregate findings from multiple sources)
- โ Long-running investigations
When to use OmniCoreAgent:
- โ Simple Q&A
- โ Single-perspective tasks
- โ Direct tool execution
- โ Chat interfaces
๐ Learn More: See DeepAgent Cookbook for complete examples.
๐ก When to Use: Use DeepAgent when your tasks may benefit from multi-agent orchestration (parallel research, divide-and-conquer analysis, multi-domain expertise).
7. ๐ ๏ธ Local Tools System
Register any Python function as an AI tool:
from omnicoreagent import ToolRegistry
tools = ToolRegistry()
@tools.register_tool("get_weather")
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Weather in {city}: Sunny, 25ยฐC"
@tools.register_tool("calculate_area")
def calculate_area(length: float, width: float) -> str:
"""Calculate rectangle area."""
return f"Area: {length * width} square units"
agent = OmniCoreAgent(
name="tool_agent",
local_tools=tools, # Your custom tools!
...
)
๐ก When to Use: Use Local Tools when you need custom business logic, internal APIs, or any Python functionality that isn't available via MCP servers.
8. ๐งฉ Agent Skills System (Packaged Capabilities)
OmniCoreAgent supports the Agent Skills specification โ self-contained capability packages that provide specialized knowledge, executable scripts, and documentation.
agent_config = {
"enable_agent_skills": True # Enable discovery and tools for skills
}
Key Concepts:
- Discovery: Agents automatically discover skills installed in
.agents/skills/[skill-name]. - Activation (
SKILL.md): Agents are instructed to read the "Activation Document" first to understand how to use the skill's specific capabilities. - Polyglot Execution: The
run_skill_scripttool handles scripts in Python, JavaScript/Node, TypeScript, Ruby, Perl, and Shell (bash/sh).
Directory Structure:
.agents/skills/my-skill-name/
โโโ SKILL.md # The "Activation" document (instructions + metadata)
โโโ scripts/ # Multi-language executable scripts
โโโ references/ # Deep-dive documentation
โโโ assets/ # Templates, examples, and resources
Skill Tools:
read_skill_file(skill_name, file_path): Access any file within a skill (start withSKILL.md).run_skill_script(skill_name, script_name, args?): Execute bundled scripts with automatic interpreter detection.
๐ Learn More: To learn how to create your own agent skills, visit agentskills.io.
9. ๐พ Workspace Memory โ Persistent File Storage for Agents
NEW: Cloud Storage Support! Your agents can now store files on AWS S3 or Cloudflare R2 for production-grade, distributed persistence.
A persistent file storage system that gives your agents a dedicated workspace to save, manage, and share files across sessions. Choose from local filesystem for development, or cloud storage (S3/R2) for production deployments where files need to persist across servers, scale globally, and survive restarts.
Storage Backends
| Backend | Use Case | Benefits |
|---|---|---|
local | Development, single-server | Zero config, instant setup |
s3 | Production, AWS infrastructure | Scalable, durable, global access |
r2 | Production, edge computing | Zero egress fees, Cloudflare ecosystem |
Quick Setup
# Local storage (development)
agent_config = {
"memory_tool_backend": "local"
}
# AWS S3 storage (production)
agent_config = {
"memory_tool_backend": "s3"
}
# Cloudflare R2 storage (production)
agent_config = {
"memory_tool_backend": "r2"
}
Environment Variables
For S3:
AWS_S3_BUCKET=my-agent-memories
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=your-secret
AWS_REGION=us-east-1 # optional, defaults to us-east-1
For R2:
R2_BUCKET_NAME=my-agent-memories
R2_ACCOUNT_ID=your-cloudflare-account-id
R2_ACCESS_KEY_ID=your-r2-key
R2_SECRET_ACCESS_KEY=your-r2-secret
Agent Memory Tools
When enabled, your agent automatically gets these tools:
| Tool | Purpose |
|---|---|
memory_view | View/list files in memory workspace |
memory_create_update | Create, append, or overwrite files |
memory_str_replace | Find and replace text within files |
memory_insert | Insert text at specific line numbers |
memory_delete | Delete files from workspace |
memory_rename | Rename or move files |
memory_clear_all | Clear entire workspace |
Production Features
| Feature | Local | S3 | R2 |
|---|---|---|---|
| Persistent across restarts | โ | โ | โ |
| Multi-server access | โ | โ | โ |
| Global CDN distribution | โ | โ | โ |
| Zero egress fees | N/A | โ | โ |
| Auto-retry on failure | โ | โ | โ |
| Concurrent access safety | โ | โ | โ |
Use Cases
| Use Case | Recommended Backend |
|---|---|
| Local development | local |
| Single-server production | local or s3 |
| Multi-server / Kubernetes | s3 or r2 |
| Edge computing / Workers | r2 |
| Cost-sensitive workloads | r2 (zero egress) |
Example: Research Agent with Cloud Storage
import os
# Set environment for S3
os.environ["AWS_S3_BUCKET"] = "research-agent-memories"
os.environ["AWS_ACCESS_KEY_ID"] = "AKIA..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."
os.environ["AWS_REGION"] = "us-east-1"
os.environ["AWS_ENDPOINT_URL"] = "https://s3.amazonaws.com" # Optional
agent = OmniCoreAgent(
name="research_agent",
system_instruction="You are a research assistant. Save your findings to memory.",
model_config={"provider": "openai", "model": "gpt-4o"},
agent_config={
"memory_tool_backend": "s3", # Files persist in S3
"max_steps": 50,
}
)
# Agent can now save research notes that persist across:
# - Server restarts
# - Multiple instances
# - Different geographic locations
result = await agent.run(
"Research the latest AI developments and save a summary to /notes/ai_trends_2024.md"
)
๐ก When to Use: Use
localfor development. Uses3orr2when you need:
- Files to persist across server restarts
- Multiple agent instances accessing the same workspace
- Global teams accessing shared agent knowledge
- Production-grade durability and reliability
10. ๐ฅ Sub-Agents System
Delegate tasks to specialized child agents:
weather_agent = OmniCoreAgent(name="weather_agent", ...)
filesystem_agent = OmniCoreAgent(name="filesystem_agent", mcp_tools=MCP_TOOLS, ...)
parent_agent = OmniCoreAgent(
name="parent_agent",
sub_agents=[weather_agent, filesystem_agent],
...
)
๐ก When to Use: Use Sub-Agents when you have specialized agents (e.g., weather, code, data) and want a parent agent to delegate tasks intelligently. Great for building modular, reusable agent architectures.
11. ๐ฐ๏ธ Background Agents
Autonomous agents that run on varying schedules (Interval or Cron) or process tasks from a persistent queue.
from omnicoreagent import BackgroundAgentManager, MemoryRouter, EventRouter
# Initialize the manager
manager = BackgroundAgentManager(
memory_router=MemoryRouter("redis"),
event_router=EventRouter("redis_stream")
)
# Create a background agent
background_agent_config = {
"agent_id": "system_monitor",
"system_instruction": "Monitor system resources and report anomalies.",
"model_config": {"provider": "openai", "model": "gpt-4o-mini"},
"queue_size": 10, # Max pending tasks
# Schedule options:
"interval": 300, # Integer = seconds (every 5 mins)
# "interval": "* * * * *", # String = Cron expression (every minute)
"task_config": {
"query": "Check metrics and alert if CPU > 80%",
"timeout": 60, # Kill task if it hangs
"max_retries": 3,
"retry_delay": 10
}
}
await manager.create_agent(background_agent_config)
# Start the system
await manager.start()
# Trigger manually if needed
await manager.run_task_now("system_monitor", {"query": "Immediate check!"})
๐ Deep Dive: Check out the Background Agents Cookbook for full "Kitchen Sink" examples including lifecycle management (pause/resume/delete) and advanced configuration.
๐ ๏ธ Comprehensive API Reference
BackgroundAgentManager (Orchestrator)
Lifecycle Management
await manager.create_agent(config: Dict) -> Dict: Create, register, and schedule a new background agent.await manager.start(): Start the manager and all scheduled agents.await manager.shutdown(): Gracefully stop the manager and all agents.await manager.start_agent(agent_id): Start (schedule) a specific agent.await manager.stop_agent(agent_id): Stop (unschedule) a specific agent.await manager.pause_agent(agent_id): Pause an agent's schedule without stopping its worker.await manager.resume_agent(agent_id): Resume a paused agent's schedule.await manager.delete_agent(agent_id): Stop, cleanup, and remove an agent completely.
Task Management
await manager.register_task(agent_id, task_config): Register or update a task for an agent.await manager.run_task_now(agent_id, task_config): Trigger an immediate execution outside the schedule.await manager.register_and_run(agent_id, task_config): Register a task and run it immediately.await manager.update_task_config(agent_id, task_config): Update an existing task configuration.await manager.remove_task(agent_id): Remove a task configuration.await manager.list_tasks(): List all agents with registered tasks.
Status & Monitoring
await manager.get_agent_status(agent_id): Get comprehensive status (running, scheduled, last run, errors).await manager.get_manager_status(): Get overall system status (total agents, running count, resource usage).await manager.list_agents(): List all registered agent IDs.await manager.is_agent_running(agent_id): Check if an agent is currently executing a task.await manager.get_running_agents(): List all currently executing agents.await manager.get_agent_metrics(agent_id): Get performance metrics (run count, errors, timestamps).await manager.get_all_metrics(): Get metrics for all agents.
Configuration & Information
await manager.update_agent_config(agent_id, new_config): Update agent settings (including model/tools).await manager.get_task_config(agent_id): Retrieve current task configuration.await manager.get_agent(agent_id): Access the rawBackgroundOmniCoreAgentinstance.await manager.get_agent_event_info(agent_id): Get event stream connection details.await manager.get_all_event_info(): Get event info for all agents and shared stores.await manager.get_agent_session_id(agent_id): Get the persistent session ID.await manager.get_all_session_ids(): Get a map of all agent session IDs.
BackgroundOmniCoreAgent (The Workers)
Execution & Control
await agent.submit_task(task_config): Queue a task for reliable execution.await agent.run_task(task_config): Interface for scheduler/manual triggers.await agent.start_worker(): Start the background task processing loop.await agent.stop_worker(): Gracefully stop the background worker.property agent.is_worker_running: Check if the worker loop is active.await agent.connect_mcp_servers(): Establish connections to configured MCP tools.await agent.cleanup(): comprehensive cleanup of resources, connections, and tasks.
State & Visibility
await agent.get_status(): Get health, configuration, and execution state.await agent.get_session_id(): Get the persistent session ID.await agent.has_task(): Check if a valid task is registered.await agent.get_task_query(): Get the current query/instruction being executed.await agent.get_task_config(): Get the full task configuration dict.
Events & Streaming
await agent.stream_events(session_id): Real-time event generator.await agent.get_events(session_id): Retrieve past event history.await agent.get_event_stream_info(): Connection details for external consumers.await agent.update_config(new_config): Hot-reload agent configuration.
๐ก When to Use: Perfect for scheduled tasks like system monitoring, periodic reports, data syncing, or any automation that runs independently without user interaction.
12. ๐ Workflow Agents
Orchestrate multiple agents for complex tasks:
from omnicoreagent import SequentialAgent, ParallelAgent, RouterAgent
# Sequential: Chain agents step-by-step
seq_agent = SequentialAgent(sub_agents=[agent1, agent2, agent3])
result = await seq_agent.run(initial_task="Analyze and report")
# Parallel: Run agents concurrently
par_agent = ParallelAgent(sub_agents=[agent1, agent2, agent3])
results = await par_agent.run(agent_tasks={
"analyzer": "Analyze data",
"processor": "Process results"
})
# Router: Intelligent task routing
router = RouterAgent(
sub_agents=[code_agent, data_agent, research_agent],
model_config={"provider": "openai", "model": "gpt-4o"}
)
result = await router.run(task="Find and summarize AI research")
๐ก When to Use:
- SequentialAgent: When tasks depend on each other (output of one โ input of next)
- ParallelAgent: When tasks are independent and can run simultaneously for speed
- RouterAgent: When you need intelligent task routing to specialized agents
13. ๐ง Advanced Tool Use (BM25 Retrieval)
Automatically discover relevant tools at runtime using BM25 lexical search:
agent_config = {
"enable_advanced_tool_use": True # Enable BM25 retrieval
}
How It Works:
- All MCP tools loaded into in-memory registry
- BM25 index built over tool names, descriptions, parameters
- User task used as search query
- Top 5 relevant tools dynamically injected
Benefits: Scales to 1000+ tools, zero network I/O, deterministic, container-friendly.
๐ก When to Use: Enable when you have many MCP tools (10+) and want the agent to automatically discover the right tools for each task without manual selection.
14. ๐ Production Observability & Metrics
๐ Real-time Usage Metrics
OmniCoreAgent tracks every token, request, and millisecond. Each run() returns a metric object, and you can get cumulative stats anytime.
result = await agent.run("Analyze this data")
print(f"Request Tokens: {result['metric'].request_tokens}")
print(f"Time Taken: {result['metric'].total_time:.2f}s")
# Get aggregated metrics for the agent's lifecycle
stats = await agent.get_metrics()
print(f"Avg Response Time: {stats['average_time']:.2f}s")
๐ Opik Tracing
Monitor and optimize your agents with deep traces:
# Add to .env
OPIK_API_KEY=your_opik_api_key
OPIK_WORKSPACE=your_workspace
What's Tracked: LLM call performance, tool execution traces, memory operations, agent workflow, bottlenecks.
Agent Execution Trace:
โโโ agent_execution: 4.6s
โโโ tools_registry_retrieval: 0.02s โ
โโโ memory_retrieval_step: 0.08s โ
โโโ llm_call: 4.5s โ ๏ธ (bottleneck!)
โโโ action_execution: 0.03s โ
๐ก When to Use: Essential for production. Use Metrics for cost/performance monitoring, and Opik for identifying bottlenecks and debugging complex agent logic.
15. ๐ก๏ธ Prompt Injection Guardrails
Protect your agents against malicious inputs, jailbreaks, and instruction overrides before they reach the LLM.
agent_config = {
"guardrail_config": {
"strict_mode": True, # Block all suspicious inputs
"sensitivity": 0.85, # 0.0 to 1.0 (higher = more sensitive)
"enable_pattern_matching": True,
"enable_heuristic_analysis": True
}
}
agent = OmniCoreAgent(..., agent_config=agent_config)
# If a threat is detected:
# result['response'] -> "I'm sorry, but I cannot process this request due to safety concerns..."
# result['guardrail_result'] -> Full metadata about the detected threat
Key Protections:
- Instruction Overrides: "Ignore previous instructions..."
- Jailbreaks: DAN mode, roleplay escapes, etc.
- Toxicity & Abuse: Built-in pattern recognition.
- Payload Splitting: Detects fragmented attack attempts.
โ๏ธ Configuration Options
| Parameter | Type | Default | Description |
|---|---|---|---|
strict_mode | bool | False | When True, any detection (even low confidence) blocks the request. |
sensitivity | float | 1.0 | Scaling factor for threat scores (0.0 to 1.0). Higher = more sensitive. |
max_input_length | int | 10000 | Maximum allowed query length before blocking. |
enable_encoding_detection | bool | True | Detects base64, hex, and other obfuscation attempts. |
enable_heuristic_analysis | bool | True | Analyzes prompt structure for typical attack patterns. |
enable_sequential_analysis | bool | True | Checks for phased attacks across multiple tokens. |
enable_entropy_analysis | bool | True | Detects high-entropy payloads common in injections. |
allowlist_patterns | list | [] | List of regex patterns that bypass safety checks. |
blocklist_patterns | list | [] | Custom regex patterns to always block. |
๐ก When to Use: Always enable in user-facing applications to prevent prompt injection attacks and ensure agent reliability.
16. ๐ Universal Model Support
Model-agnostic through LiteLLM โ use any provider:
# OpenAI
model_config = {"provider": "openai", "model": "gpt-4o"}
# Anthropic
model_config = {"provider": "anthropic", "model": "claude-3-5-sonnet-20241022"}
# Groq (Ultra-fast)
model_config = {"provider": "groq", "model": "llama-3.1-8b-instant"}
# Ollama (Local)
model_config = {"provider": "ollama", "model": "llama3.1:8b", "ollama_host": "http://localhost:11434"}
# OpenRouter (200+ models)
model_config = {"provider": "openrouter", "model": "anthropic/claude-3.5-sonnet"}
#mistral ai
model_config = {"provider": "mistral", "model": "mistral-7b-instruct"}
#deepseek
model_config = {"provider": "deepseek", "model": "deepseek-chat"}
#google gemini
model_config = {"provider": "google", "model": "gemini-2.0-flash-exp"}
#azure openai
model_config = {"provider": "azure_openai", "model": "gpt-4o"}
Supported: OpenAI, Anthropic, Google Gemini, Groq, DeepSeek, Mistral, Azure OpenAI, OpenRouter, Ollama
๐ก When to Use: Switch providers based on your needs โ use cheaper models (Groq, DeepSeek) for simple tasks, powerful models (GPT-4o, Claude) for complex reasoning, and local models (Ollama) for privacy-sensitive applications.
17. ๐ OmniServe โ Production API Server
Turn any agent into a production-ready REST/SSE API with a single command.
๐ฆ Agent File Requirements
To use OmniServe with your agent, your Python file must define one of the following:
# Option 1: Define an `agent` variable
from omnicoreagent import OmniCoreAgent
agent = OmniCoreAgent(
name="MyAgent",
system_instruction="You are a helpful assistant.",
model_config={"provider": "gemini", "model": "gemini-2.0-flash"},
)
# Option 2: Define a `create_agent()` function
from omnicoreagent import OmniCoreAgent
def create_agent():
"""Factory function that returns an agent instance."""
return OmniCoreAgent(
name="MyAgent",
system_instruction="You are a helpful assistant.",
model_config={"provider": "gemini", "model": "gemini-2.0-flash"},
)
[!IMPORTANT] OmniServe looks for
agentvariable first, thencreate_agent()function. Your file must export one of these.
โก Quick Start (Step-by-Step)
Step 1: Create your agent file (my_agent.py)
from omnicoreagent import OmniCoreAgent, ToolRegistry
tools = ToolRegistry()
@tools.register_tool("greet")
def greet(name: str) -> str:
"""Greet someone by name."""
return f"Hello, {name}!"
@tools.register_tool("calculate")
def calculate(expression: str) -> dict:
"""Evaluate a math expression."""
import math
result = eval(expression, {"__builtins__": {}}, {"sqrt": math.sqrt, "pi": math.pi})
return {"expression": expression, "result": result}
agent = OmniCoreAgent(
name="MyAgent",
system_instruction="You are a helpful assistant with access to greeting and calculation tools.",
model_config={"provider": "gemini", "model": "gemini-2.0-flash"},
local_tools=tools,
)
Step 2: Set environment variables
echo "LLM_API_KEY=your_api_key_here" > .env
Step 3: Run the server
omniserve run --agent my_agent.py
Step 4: Test the API
# Health check
curl http://localhost:8000/health
# Run a query (sync)
curl -X POST http://localhost:8000/run/sync \
-H "Content-Type: application/json" \
-d '{"query": "Greet Alice and calculate 2+2"}'
# Run a query (streaming SSE)
curl -X POST http://localhost:8000/run \
-H "Content-Type: application/json" \
-d '{"query": "What is sqrt(144)?"}'
# Open interactive docs
open http://localhost:8000/docs
๐ฅ๏ธ CLI Commands
| Command | Description |
|---|---|
omniserve run | Run your agent file as API server |
omniserve quickstart | Zero-code server with defaults |
omniserve config | View or generate configuration |
omniserve generate-dockerfile | Generate production Dockerfile |
CLI Options: omniserve run
omniserve run \
--agent my_agent.py \ # Path to agent file (required)
--host 0.0.0.0 \ # Host to bind (default: 0.0.0.0)
--port 8000 \ # Port to bind (default: 8000)
--workers 1 \ # Worker processes (default: 1)
--auth-token YOUR_TOKEN \ # Enable Bearer token auth
--rate-limit 100 \ # Rate limit (requests per minute)
--cors-origins "*" \ # Comma-separated CORS origins
--no-docs \ # Disable Swagger UI
--reload # Enable hot reload (development)
Examples:
# Basic run
omniserve run --agent my_agent.py
# With authentication
omniserve run --agent my_agent.py --auth-token secret123
# With rate limiting
omniserve run --agent my_agent.py --rate-limit 100
# Production settings
omniserve run --agent my_agent.py \
--port 8000 \
--auth-token $AUTH_TOKEN \
--rate-limit 100 \
--cors-origins "https://myapp.com,https://api.myapp.com"
# Development with hot reload
omniserve run --agent my_agent.py --reload
CLI Options: omniserve quickstart
Start a server instantly without writing any code:
omniserve quickstart \
--provider openai \ # LLM provider (openai, gemini, anthropic)
--model gpt-4o \ # Model name
--name QuickAgent \ # Agent name (default: QuickAgent)
--instruction "You are..." \ # System instruction
--port 8000 # Port (default: 8000)
Examples:
# OpenAI
omniserve quickstart --provider openai --model gpt-4o
# Google Gemini
omniserve quickstart --provider gemini --model gemini-2.0-flash
# Anthropic Claude
omniserve quickstart --provider anthropic --model claude-3-5-sonnet-20241022
๐ API Endpoints
| Method | Endpoint | Auth | Description |
|---|---|---|---|
POST | /run | Yes* | SSE streaming response |
POST | /run/sync | Yes* | JSON response (blocking) |
GET | /health | No | Health check |
GET | /ready | No | Readiness check |
GET | /prometheus | No | Prometheus metrics |
GET | /tools | Yes* | List available tools |
GET | /metrics | Yes* | Agent usage metrics |
GET | /docs | No | Swagger UI |
GET | /redoc | No | ReDoc UI |
*Auth required only if --auth-token is set.
Request/Response Examples:
# Sync request (with auth)
curl -X POST http://localhost:8000/run/sync \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{"query": "What is 2+2?", "session_id": "user123"}'
# Response:
# {"response": "2+2 equals 4", "session_id": "user123", ...}
# Streaming SSE request
curl -X POST http://localhost:8000/run \
-H "Content-Type: application/json" \
-d '{"query": "Explain quantum computing"}'
# List tools
curl http://localhost:8000/tools \
-H "Authorization: Bearer YOUR_TOKEN"
๐ง Environment Variables
All settings via OMNISERVE_* prefix. Environment variables always override code values.
| Variable | Default | Description |
|---|---|---|
OMNISERVE_HOST | 0.0.0.0 | Server host |
OMNISERVE_PORT | 8000 | Server port |
OMNISERVE_WORKERS | 1 | Worker processes |
OMNISERVE_API_PREFIX | "" | API path prefix (e.g., /api/v1) |
OMNISERVE_ENABLE_DOCS | true | Swagger UI at /docs |
OMNISERVE_ENABLE_REDOC | true | ReDoc at /redoc |
OMNISERVE_CORS_ENABLED | true | Enable CORS |
OMNISERVE_CORS_ORIGINS | * | Allowed origins (comma-separated) |
OMNISERVE_CORS_CREDENTIALS | true | Allow credentials |
OMNISERVE_AUTH_ENABLED | false | Enable Bearer token auth |
OMNISERVE_AUTH_TOKEN | โ | Bearer token value |
OMNISERVE_RATE_LIMIT_ENABLED | false | Enable rate limiting |
OMNISERVE_RATE_LIMIT_REQUESTS | 100 | Requests per window |
OMNISERVE_RATE_LIMIT_WINDOW | 60 | Window in seconds |
OMNISERVE_REQUEST_LOGGING | true | Log requests |
OMNISERVE_LOG_LEVEL | INFO | Log level (DEBUG/INFO/WARNING/ERROR) |
OMNISERVE_REQUEST_TIMEOUT | 300 | Request timeout in seconds |
Example .env file:
# Required
LLM_API_KEY=your_api_key_here
# OmniServe settings
OMNISERVE_PORT=8000
OMNISERVE_AUTH_ENABLED=true
OMNISERVE_AUTH_TOKEN=my-secret-token
OMNISERVE_RATE_LIMIT_ENABLED=true
OMNISERVE_RATE_LIMIT_REQUESTS=100
OMNISERVE_CORS_ORIGINS=https://myapp.com,https://api.myapp.com
๐ณ Docker Deployment
Generate a Dockerfile:
omniserve generate-dockerfile --file my_agent.py
Build and run:
docker build -t omniserver .
docker run -p 8000:8000 -e LLM_API_KEY=$LLM_API_KEY omniserver
Smart Configuration โ The generator inspects your agent and configures storage automatically:
| Your Agent Uses | Dockerfile Sets |
|---|---|
| No memory tools | AGENT_PATH, OMNICOREAGENT_ARTIFACTS_DIR |
| Local memory | + OMNICOREAGENT_MEMORY_DIR=/tmp/memories |
| S3/R2 memory | Pass credentials at runtime with -e |
Cloud deployment examples:
# Local memory (ephemeral)
docker run -p 8000:8000 -e LLM_API_KEY=$LLM_API_KEY omniserver
# AWS S3 memory (persistent)
docker run -p 8000:8000 \
-e LLM_API_KEY=$LLM_API_KEY \
-e AWS_S3_BUCKET=my-bucket \
-e AWS_ACCESS_KEY_ID=... \
-e AWS_SECRET_ACCESS_KEY=... \
-e AWS_REGION=us-east-1 \
omniserver
# Cloudflare R2 memory (persistent)
docker run -p 8000:8000 \
-e LLM_API_KEY=$LLM_API_KEY \
-e R2_BUCKET_NAME=my-bucket \
-e R2_ACCOUNT_ID=... \
-e R2_ACCESS_KEY_ID=... \
-e R2_SECRET_ACCESS_KEY=... \
omniserver
๐ Python API (Programmatic Control)
For full programmatic control, use OmniServe directly in your Python script:
Create server.py:
from omnicoreagent import OmniCoreAgent, OmniServe, OmniServeConfig, ToolRegistry
tools = ToolRegistry()
@tools.register_tool("get_time")
def get_time() -> dict:
from datetime import datetime
return {"time": datetime.now().isoformat()}
agent = OmniCoreAgent(
name="MyAgent",
system_instruction="You are a helpful assistant.",
model_config={"provider": "gemini", "model": "gemini-2.0-flash"},
local_tools=tools,
)
config = OmniServeConfig(
host="0.0.0.0",
port=8000,
auth_enabled=True,
auth_token="my-secret-token",
rate_limit_enabled=True,
rate_limit_requests=100,
rate_limit_window=60,
cors_origins=["*"],
enable_docs=True,
)
if __name__ == "__main__":
server = OmniServe(agent, config=config)
server.start()
Run with Python directly:
# Set your API key
echo "LLM_API_KEY=your_api_key" > .env
# Run your server script
python server.py
[!IMPORTANT] CLI vs Python API:
omniserve run --agent my_agent.pyโ CLI loads your agent file and applies CLI flagspython server.pyโ You control everything programmatically viaOmniServeConfig
[!WARNING] Environment Variable Precedence:
.envvariables always override values set inOmniServeConfig. For example:# In code: config = OmniServeConfig(port=8000, auth_token="code-token")# In .env: OMNISERVE_PORT=9000 OMNISERVE_AUTH_TOKEN=env-tokenResult: Server runs on port 9000 with env-token (env wins!)
Import retry and circuit breaker for custom use:
from omnicoreagent import RetryConfig, CircuitBreaker, with_retry
@with_retry(RetryConfig(max_retries=5, strategy="exponential"))
async def call_external_api():
...
breaker = CircuitBreaker("api", failure_threshold=3, timeout=60)
async with breaker:
result = await risky_call()
๐ก When to Use: OmniServe is perfect for deploying agents as microservices, webhooks, chatbots, or any HTTP-accessible AI capability.
๐ Learn More: See OmniServe Cookbook for more examples.
๐ Examples & Cookbook
All examples are in the Cookbook โ organized by use case with progressive learning paths.
Quick Links
| Category | What You'll Build | Location |
|---|---|---|
| Getting Started | Your first agent, tools, memory, events | cookbook/getting_started |
| Workflows | Sequential, Parallel, Router agents | cookbook/workflows |
| Background Agents | Scheduled autonomous tasks | cookbook/background_agents |
| Production | Metrics, guardrails, observability | cookbook/production |
| ๐ Showcase | Full production applications | cookbook/showcase |
๐ Showcase: Full Production Applications
| Application | Description | Features |
|---|---|---|
| OmniAudit | Healthcare Claims Audit System | Multi-agent pipeline, ERISA compliance |
| DevOps Copilot | AI-Powered DevOps Automation | Docker, Prometheus, Grafana |
| Deep Code Agent | Code Analysis with Sandbox | Sandbox execution, session management |
Featured Examples
| Agent | Description | Location |
|---|---|---|
| E-commerce Shopper | Personal shopping with cart, preferences, recommendations | cookbook/advanced_agent |
| Flight Booking | Travel agent with search, booking, itineraries | cookbook/advanced_agent |
| AI Due Diligence | Investment research with web search, analysis | cookbook/advanced_agent/ai_due_diligence_agent |
# Start with the basics
python cookbook/getting_started/first_agent.py
# Or explore a full production application
cd cookbook/showcase/devops_copilot_agent && make up
โ๏ธ Configuration
Environment Variables
# Required
LLM_API_KEY=your_api_key
# Optional: Memory backends
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=postgresql://user:pass@localhost:5432/db
MONGODB_URI=mongodb://localhost:27017/omnicoreagent
# Optional: Observability
OPIK_API_KEY=your_opik_key
OPIK_WORKSPACE=your_workspace
Agent Configuration
agent_config = {
"max_steps": 15, # Max reasoning steps
"tool_call_timeout": 30, # Tool timeout (seconds)
"request_limit": 0, # 0 = unlimited
"total_tokens_limit": 0, # 0 = unlimited
"memory_config": {"mode": "sliding_window", "value": 10000},
"enable_advanced_tool_use": True, # BM25 tool retrieval
"enable_agent_skills": True, # Specialized packaged skills
"memory_tool_backend": "local" # Persistent working memory
}
Model Configuration
model_config = {
"provider": "openai",
"model": "gpt-4o",
"temperature": 0.7,
"max_tokens": 2000,
"top_p": 0.95
}
# Azure OpenAI
model_config = {
"provider": "azureopenai",
"model": "gpt-4",
"azure_endpoint": "https://your-resource.openai.azure.com",
"azure_api_version": "2024-02-01"
}
# Ollama (Local)
model_config = {
"provider": "ollama",
"model": "llama3.1:8b",
"ollama_host": "http://localhost:11434"
}
๐งช Testing & Development
# Clone
git clone https://github.com/omnirexflora-labs/omnicoreagent.git
cd omnicoreagent
# Setup
uv venv && source .venv/bin/activate
uv sync --dev
# Test
pytest tests/ -v
pytest tests/ --cov=src --cov-report=term-missing
๐ Troubleshooting
| Error | Fix |
|---|---|
Invalid API key | Check .env: LLM_API_KEY=your_key |
ModuleNotFoundError | pip install omnicoreagent |
Redis connection failed | Start Redis or use MemoryRouter("in_memory") |
MCP connection refused | Ensure MCP server is running |
OAuth Server Starts: Normal when using "auth": {"method": "oauth"}. Remove if not needed.
Debug Mode: agent = OmniCoreAgent(..., debug=True)
OmniAgent โ OmniCoreAgent Migration: If you were using the old OmniAgent class, update your imports:
# Old (deprecated)
from omnicoreagent import OmniAgent
# New (recommended)
from omnicoreagent import OmniCoreAgent
The OmniAgent alias still works but will be removed in a future release.
Help: Check GitHub Issues
๐ค Contributing
# Fork & clone
git clone https://github.com/omnirexflora-labs/omnicoreagent.git
# Setup
uv venv && source .venv/bin/activate
uv sync --dev
pre-commit install
# Submit PR
See CONTRIBUTING.md for guidelines.
๐ License
MIT License โ see LICENSE
๐จโ๐ป Author & Credits
Created by Abiola Adeshina
- GitHub: @Abiorh001
- X (Twitter): @abiorhmangana
- Email: abiolaadedayo1993@gmail.com
๐ The OmniRexFlora Ecosystem
| Project | Description |
|---|---|
| ๐ง OmniMemory | Self-evolving memory for autonomous agents |
| ๐ค OmniCoreAgent | Production-ready AI agent framework (this project) |
| โก OmniDaemon | Event-driven runtime engine for AI agents |
๐ Acknowledgments
Built on: LiteLLM, FastAPI, Redis, Opik, Pydantic, APScheduler
Related Servers
Scout Monitoring MCP
sponsorPut performance and error data directly in the hands of your AI assistant.
Alpha Vantage MCP Server
sponsorAccess financial market data: realtime & historical stock, ETF, options, forex, crypto, commodities, fundamentals, technical indicators, & more
Credential Manager
A server for securely managing API credentials locally through the Model Context Protocol (MCP).
OpenRouter MCP Client for Cursor
An MCP client for Cursor that uses OpenRouter.ai to access multiple AI models. Requires an OpenRouter API key.
Nextflow Developer Tools
An MCP server for Nextflow development and testing, which requires a local clone of the Nextflow Git repository.
FluidMCP CLI
A command-line tool to run MCP servers from a single file, with support for automatic dependency resolution, environment setup, and package installation from local or S3 sources.
WordPress Community DEV Docs
Access WordPress development rules and best practices from the WordPress LLM Rules repository. It dynamically creates tools for each rule and caches content using Cloudflare Durable Objects.
Dev.to MCP Server
An MCP server for the Dev.to API to search, browse, read, and create content on the platform.
GraphQL MCP Server
A strongly-typed MCP server that provides seamless access to any GraphQL API.
PowerShell MCP Server
Automate Windows PowerShell tasks using Python. Execute scripts, manage the clipboard, and capture terminal output programmatically.
Talk to Figma MCP
A server for integrating with Figma, allowing you to interact with your design files.
Blockchain MCP Server
A server for blockchain interactions, offering Ethereum vanity address generation, 4byte lookup, ABI encoding, and multi-chain RPC calls.