mcp-backpressure

Backpressure and concurrency control middleware for FastMCP. Prevents server overload from LLM tool-call storms with configurable limits and JSON-RPC errors.

mcp-backpressure

Backpressure and concurrency control middleware for FastMCP MCP servers.

Problem: LLMs can generate hundreds of parallel tool calls, causing resource exhaustion, server crashes, and no structured feedback for clients to retry.

Solution: Middleware that limits concurrent executions, queues excess requests with timeout, and returns structured JSON-RPC overload errors.

Quickstart

from fastmcp import FastMCP
from mcp_backpressure import BackpressureMiddleware

mcp = FastMCP("MyServer")
mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,      # Max parallel executions
    queue_size=10,         # Bounded queue for waiting requests
    queue_timeout=30.0,    # Queue wait timeout (seconds)
))

Installation

pip install mcp-backpressure

Features

  • Concurrency limiting: Semaphore-based control of parallel executions
  • Bounded queue: Optional FIFO queue with configurable size
  • Queue timeout: Automatic timeout for queued requests with cleanup
  • Structured errors: JSON-RPC compliant overload errors with detailed metrics
  • Metrics: Real-time counters for active, queued, and rejected requests
  • Callback hook: Optional notification on each overload event
  • Zero dependencies: Only requires FastMCP and Python 3.10+

Usage

Basic Configuration

from mcp_backpressure import BackpressureMiddleware

mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,           # Required: max parallel tool executions
    queue_size=10,              # Optional: bounded queue (0 = no queue)
    queue_timeout=30.0,         # Optional: seconds to wait in queue
    overload_error_code=-32001, # Optional: JSON-RPC error code
    on_overload=callback,       # Optional: called on each overload
))

Parameters

ParameterTypeDefaultDescription
max_concurrentintrequiredMaximum number of concurrent tool executions. Must be >= 1.
queue_sizeint0Maximum queue size for waiting requests. Set to 0 to reject immediately when limit reached.
queue_timeoutfloat30.0Maximum time (seconds) a request can wait in queue before timing out. Must be > 0.
overload_error_codeint-32001JSON-RPC error code returned when server is overloaded.
on_overloadCallableNoneOptional callback (error: OverloadError) -> None invoked on each overload.

Error Handling

When the server is overloaded, requests are rejected with a structured JSON-RPC error:

{
  "code": -32001,
  "message": "SERVER_OVERLOADED",
  "data": {
    "reason": "queue_full",
    "active": 5,
    "queued": 10,
    "max_concurrent": 5,
    "queue_size": 10,
    "queue_timeout_ms": 30000,
    "retry_after_ms": 1000
  }
}

Overload Reasons

ReasonDescription
concurrency_limitAll execution slots full and no queue configured (queue_size=0)
queue_fullAll execution slots and queue slots are full
queue_timeoutRequest waited in queue longer than queue_timeout

Metrics

Get real-time metrics from the middleware:

metrics = middleware.get_metrics()  # Synchronous

print(f"Active: {metrics.active}")
print(f"Queued: {metrics.queued}")
print(f"Total rejected: {metrics.total_rejected}")
print(f"Rejected (concurrency): {metrics.rejected_concurrency_limit}")
print(f"Rejected (queue full): {metrics.rejected_queue_full}")
print(f"Rejected (timeout): {metrics.rejected_queue_timeout}")

For async contexts, use await middleware.get_metrics_async().

Callback Hook

Register a callback to be notified of each overload event:

def on_overload(error: OverloadError):
    print(f"OVERLOAD: {error.reason} (active={error.active})")
    # Log to monitoring system, update metrics, etc.

middleware = BackpressureMiddleware(
    max_concurrent=5,
    queue_size=10,
    on_overload=on_overload,
)

Examples

Simple Server

See examples/simple_server.py for a minimal FastMCP server with backpressure.

Load Simulation

Run examples/load_simulation.py to see backpressure behavior under heavy concurrent load:

python examples/load_simulation.py

This simulates 30 concurrent requests against a server limited to 5 concurrent executions with a queue of 10, demonstrating how the middleware handles overload.

How It Works

The middleware provides two-level limiting:

  1. Semaphore (max_concurrent): Controls active executions
  2. Bounded queue (queue_size): Holds waiting requests with timeout

Request flow:

  • If execution slot available → execute immediately
  • If execution slots full and queue not full → wait in queue with timeout
  • If queue full → reject with queue_full
  • If timeout in queue → reject with queue_timeout

Invariants (guaranteed under all conditions):

  • active <= max_concurrent ALWAYS
  • queued <= queue_size ALWAYS
  • Cancellation correctly frees slots and decrements counters
  • Queue timeout removes item from queue

Development

Running Tests

python -m pytest tests/ -v

Linting

ruff check src/ tests/

Design Rationale

This library emerged from python-sdk #1698 (closed as "not planned"). Key design decisions:

  • Global limits only (v0.1): Per-client and per-tool limits deferred to v0.2+
  • Simple counters: No Prometheus/OTEL dependencies by default
  • JSON-RPC errors: Follows MCP protocol conventions
  • Monotonic time: Queue timeouts use time.monotonic() for reliability

License

MIT

Contributing

Contributions welcome! Please open an issue before submitting PRs.

Changelog

See CHANGELOG.md

Related Servers