Celery Flower MCP

MCP server for Celery Flower — monitor workers, manage tasks and queues from any AI assistant

🌸 celery-flower-mcp

CI codecov PyPI Python 3.14+ MCP Ruff uv License: MIT

Give your AI assistant full control over Celery — monitor workers, manage tasks, inspect queues.

Features · Quick Start · Configuration · Tools · Development · Contributing


What is this?

celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:

  • Monitor workers, tasks, and queues in real time
  • Control worker pools — grow, shrink, autoscale, restart, shut down
  • Manage tasks — apply, revoke, abort, set timeouts and rate limits
  • Inspect queues — check depths, add/remove consumers

All 21 Flower API endpoints are covered.

Features

  • Full API coverage — every Flower REST endpoint exposed as an MCP tool
  • Dependency injection via dishka — clean, testable architecture
  • Pydantic Settings — typed configuration with .env file support
  • Async throughout — built on httpx + FastMCP
  • 65 tests — 49 unit tests (99% coverage) + 16 integration tests against real Flower
  • Strict typing — mypy strict mode, fully annotated

Quick Start

Install via uvx

FLOWER_URL=http://localhost:5555 uvx celery-flower-mcp

Install from source

git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main

Claude Desktop

Add to ~/Library/Application Support/Claude/claude_desktop_config.json:

{
  "mcpServers": {
    "celery-flower": {
      "command": "uvx",
      "args": ["celery-flower-mcp"],
      "env": {
        "FLOWER_URL": "http://localhost:5555"
      }
    }
  }
}

Configuration

Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:

cp .env.example .env
VariableDefaultDescription
FLOWER_URLhttp://localhost:5555Base URL of your Flower instance
FLOWER_USERNAMEBasic auth username
FLOWER_PASSWORDBasic auth password
FLOWER_API_TOKENBearer token (takes priority over basic auth)

Available Tools

Workers (8 tools)

ToolDescription
list_workersList all workers — optionally filter by name, refresh live stats, or get status only
shutdown_workerGracefully shut down a worker
restart_worker_poolRestart a worker's process pool
grow_worker_poolAdd N processes to a worker's pool
shrink_worker_poolRemove N processes from a worker's pool
autoscale_worker_poolConfigure autoscale min/max bounds
add_queue_consumerMake a worker start consuming from a queue
cancel_queue_consumerMake a worker stop consuming from a queue

Tasks (11 tools)

ToolDescription
list_tasksList tasks with filters: state, worker, name, date range, search, pagination
list_task_typesList all registered task types across workers
get_task_infoGet full details for a task by UUID
get_task_resultRetrieve a task's result (with optional timeout)
apply_taskExecute a task synchronously and wait for the result
async_apply_taskDispatch a task asynchronously, returns task UUID
send_taskSend a task by name — no registration required on worker side
abort_taskAbort a running task
revoke_taskRevoke a task; optionally terminate with a signal
set_task_timeoutSet soft and/or hard time limits for a task on a worker
set_task_rate_limitSet rate limit for a task on a worker (e.g. 100/m)

Queues & Health (2 tools)

ToolDescription
get_queue_lengthsGet the current depth of all configured queues
healthcheckCheck whether the Flower instance is reachable and healthy

Architecture

source/
├── main.py        # FastMCP server entry point + dishka container wiring
├── settings.py    # Pydantic Settings — typed config from env / .env
├── client.py      # Async HTTP client wrapping Flower REST API
├── providers.py   # dishka Provider — manages FlowerClient lifecycle
└── tools/
    ├── workers.py # 8 worker management tools
    ├── tasks.py   # 11 task management tools
    └── queues.py  # 2 queue / health tools

dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.

Development

make fmt        # auto-format with ruff
make lint       # lint with ruff
make typecheck  # type-check with mypy (strict)
make test       # run 49 unit tests
make cov        # unit tests + coverage report
make all        # fmt + lint + typecheck

Testing

The test suite is split into two layers:

Unit tests (tests/) — fast, no external dependencies, use pytest-httpx to mock HTTP calls:

make test
# or
uv run pytest tests/ -m "not integration"

Integration tests (tests/integration/) — run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:

make integration

This command:

  1. Builds and starts the Docker Compose stack (docker-compose.test.yml) — Redis → Celery worker → Flower
  2. Waits for Flower's /healthcheck endpoint to return OK
  3. Runs the 16 integration tests against http://localhost:5555
  4. Tears down the stack when done

The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.

To start the stack manually for exploratory testing:

docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down   # stop + remove volumes

Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop — this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.

See CONTRIBUTING.md for details on adding new tools or submitting a PR.

Changelog

See CHANGELOG.md.

License

MIT

相关服务器

NotebookLM 网页导入器

一键将网页和 YouTube 视频导入 NotebookLM。超过 200,000 用户信赖。

安装 Chrome 扩展