Celery Flower MCP

MCP server for Celery Flower โ€” monitor workers, manage tasks and queues from any AI assistant

๐ŸŒธ celery-flower-mcp

CI codecov PyPI Python 3.14+ MCP Ruff uv License: MIT

Give your AI assistant full control over Celery โ€” monitor workers, manage tasks, inspect queues.

Features ยท Quick Start ยท Configuration ยท Tools ยท Development ยท Contributing


What is this?

celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:

  • Monitor workers, tasks, and queues in real time
  • Control worker pools โ€” grow, shrink, autoscale, restart, shut down
  • Manage tasks โ€” apply, revoke, abort, set timeouts and rate limits
  • Inspect queues โ€” check depths, add/remove consumers

All 21 Flower API endpoints are covered.

Features

  • Full API coverage โ€” every Flower REST endpoint exposed as an MCP tool
  • Dependency injection via dishka โ€” clean, testable architecture
  • Pydantic Settings โ€” typed configuration with .env file support
  • Async throughout โ€” built on httpx + FastMCP
  • 65 tests โ€” 49 unit tests (99% coverage) + 16 integration tests against real Flower
  • Strict typing โ€” mypy strict mode, fully annotated

Quick Start

Install via uvx

FLOWER_URL=http://localhost:5555 uvx celery-flower-mcp

Install from source

git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main

Claude Desktop

Add to ~/Library/Application Support/Claude/claude_desktop_config.json:

{
  "mcpServers": {
    "celery-flower": {
      "command": "uvx",
      "args": ["celery-flower-mcp"],
      "env": {
        "FLOWER_URL": "http://localhost:5555"
      }
    }
  }
}

Configuration

Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:

cp .env.example .env
VariableDefaultDescription
FLOWER_URLhttp://localhost:5555Base URL of your Flower instance
FLOWER_USERNAMEโ€”Basic auth username
FLOWER_PASSWORDโ€”Basic auth password
FLOWER_API_TOKENโ€”Bearer token (takes priority over basic auth)

Available Tools

Workers (8 tools)

ToolDescription
list_workersList all workers โ€” optionally filter by name, refresh live stats, or get status only
shutdown_workerGracefully shut down a worker
restart_worker_poolRestart a worker's process pool
grow_worker_poolAdd N processes to a worker's pool
shrink_worker_poolRemove N processes from a worker's pool
autoscale_worker_poolConfigure autoscale min/max bounds
add_queue_consumerMake a worker start consuming from a queue
cancel_queue_consumerMake a worker stop consuming from a queue

Tasks (11 tools)

ToolDescription
list_tasksList tasks with filters: state, worker, name, date range, search, pagination
list_task_typesList all registered task types across workers
get_task_infoGet full details for a task by UUID
get_task_resultRetrieve a task's result (with optional timeout)
apply_taskExecute a task synchronously and wait for the result
async_apply_taskDispatch a task asynchronously, returns task UUID
send_taskSend a task by name โ€” no registration required on worker side
abort_taskAbort a running task
revoke_taskRevoke a task; optionally terminate with a signal
set_task_timeoutSet soft and/or hard time limits for a task on a worker
set_task_rate_limitSet rate limit for a task on a worker (e.g. 100/m)

Queues & Health (2 tools)

ToolDescription
get_queue_lengthsGet the current depth of all configured queues
healthcheckCheck whether the Flower instance is reachable and healthy

Architecture

source/
โ”œโ”€โ”€ main.py        # FastMCP server entry point + dishka container wiring
โ”œโ”€โ”€ settings.py    # Pydantic Settings โ€” typed config from env / .env
โ”œโ”€โ”€ client.py      # Async HTTP client wrapping Flower REST API
โ”œโ”€โ”€ providers.py   # dishka Provider โ€” manages FlowerClient lifecycle
โ””โ”€โ”€ tools/
    โ”œโ”€โ”€ workers.py # 8 worker management tools
    โ”œโ”€โ”€ tasks.py   # 11 task management tools
    โ””โ”€โ”€ queues.py  # 2 queue / health tools

dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.

Development

make fmt        # auto-format with ruff
make lint       # lint with ruff
make typecheck  # type-check with mypy (strict)
make test       # run 49 unit tests
make cov        # unit tests + coverage report
make all        # fmt + lint + typecheck

Testing

The test suite is split into two layers:

Unit tests (tests/) โ€” fast, no external dependencies, use pytest-httpx to mock HTTP calls:

make test
# or
uv run pytest tests/ -m "not integration"

Integration tests (tests/integration/) โ€” run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:

make integration

This command:

  1. Builds and starts the Docker Compose stack (docker-compose.test.yml) โ€” Redis โ†’ Celery worker โ†’ Flower
  2. Waits for Flower's /healthcheck endpoint to return OK
  3. Runs the 16 integration tests against http://localhost:5555
  4. Tears down the stack when done

The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.

To start the stack manually for exploratory testing:

docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down   # stop + remove volumes

Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop โ€” this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.

See CONTRIBUTING.md for details on adding new tools or submitting a PR.

Changelog

See CHANGELOG.md.

License

MIT

Related Servers

NotebookLM Web Importer

Import web pages and YouTube videos to NotebookLM with one click. Trusted by 200,000+ users.

Install Chrome Extension