Celery Flower MCP

MCP server for Celery Flower — monitor workers, manage tasks and queues from any AI assistant

🌸 celery-flower-mcp

CI codecov PyPI Python 3.14+ MCP Ruff uv License: MIT

Give your AI assistant full control over Celery — monitor workers, manage tasks, inspect queues.

Features · Quick Start · Configuration · Tools · Development · Contributing


What is this?

celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:

  • Monitor workers, tasks, and queues in real time
  • Control worker pools — grow, shrink, autoscale, restart, shut down
  • Manage tasks — apply, revoke, abort, set timeouts and rate limits
  • Inspect queues — check depths, add/remove consumers

All 21 Flower API endpoints are covered.

Features

  • Full API coverage — every Flower REST endpoint exposed as an MCP tool
  • Dependency injection via dishka — clean, testable architecture
  • Pydantic Settings — typed configuration with .env file support
  • Async throughout — built on httpx + FastMCP
  • 65 tests — 49 unit tests (99% coverage) + 16 integration tests against real Flower
  • Strict typing — mypy strict mode, fully annotated

Quick Start

Install via uvx

FLOWER_URL=http://localhost:5555 uvx celery-flower-mcp

Install from source

git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main

Claude Desktop

Add to ~/Library/Application Support/Claude/claude_desktop_config.json:

{
  "mcpServers": {
    "celery-flower": {
      "command": "uvx",
      "args": ["celery-flower-mcp"],
      "env": {
        "FLOWER_URL": "http://localhost:5555"
      }
    }
  }
}

Configuration

Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:

cp .env.example .env
VariableDefaultDescription
FLOWER_URLhttp://localhost:5555Base URL of your Flower instance
FLOWER_USERNAMEBasic auth username
FLOWER_PASSWORDBasic auth password
FLOWER_API_TOKENBearer token (takes priority over basic auth)

Available Tools

Workers (8 tools)

ToolDescription
list_workersList all workers — optionally filter by name, refresh live stats, or get status only
shutdown_workerGracefully shut down a worker
restart_worker_poolRestart a worker's process pool
grow_worker_poolAdd N processes to a worker's pool
shrink_worker_poolRemove N processes from a worker's pool
autoscale_worker_poolConfigure autoscale min/max bounds
add_queue_consumerMake a worker start consuming from a queue
cancel_queue_consumerMake a worker stop consuming from a queue

Tasks (11 tools)

ToolDescription
list_tasksList tasks with filters: state, worker, name, date range, search, pagination
list_task_typesList all registered task types across workers
get_task_infoGet full details for a task by UUID
get_task_resultRetrieve a task's result (with optional timeout)
apply_taskExecute a task synchronously and wait for the result
async_apply_taskDispatch a task asynchronously, returns task UUID
send_taskSend a task by name — no registration required on worker side
abort_taskAbort a running task
revoke_taskRevoke a task; optionally terminate with a signal
set_task_timeoutSet soft and/or hard time limits for a task on a worker
set_task_rate_limitSet rate limit for a task on a worker (e.g. 100/m)

Queues & Health (2 tools)

ToolDescription
get_queue_lengthsGet the current depth of all configured queues
healthcheckCheck whether the Flower instance is reachable and healthy

Architecture

source/
├── main.py        # FastMCP server entry point + dishka container wiring
├── settings.py    # Pydantic Settings — typed config from env / .env
├── client.py      # Async HTTP client wrapping Flower REST API
├── providers.py   # dishka Provider — manages FlowerClient lifecycle
└── tools/
    ├── workers.py # 8 worker management tools
    ├── tasks.py   # 11 task management tools
    └── queues.py  # 2 queue / health tools

dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.

Development

make fmt        # auto-format with ruff
make lint       # lint with ruff
make typecheck  # type-check with mypy (strict)
make test       # run 49 unit tests
make cov        # unit tests + coverage report
make all        # fmt + lint + typecheck

Testing

The test suite is split into two layers:

Unit tests (tests/) — fast, no external dependencies, use pytest-httpx to mock HTTP calls:

make test
# or
uv run pytest tests/ -m "not integration"

Integration tests (tests/integration/) — run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:

make integration

This command:

  1. Builds and starts the Docker Compose stack (docker-compose.test.yml) — Redis → Celery worker → Flower
  2. Waits for Flower's /healthcheck endpoint to return OK
  3. Runs the 16 integration tests against http://localhost:5555
  4. Tears down the stack when done

The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.

To start the stack manually for exploratory testing:

docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down   # stop + remove volumes

Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop — this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.

See CONTRIBUTING.md for details on adding new tools or submitting a PR.

Changelog

See CHANGELOG.md.

License

MIT

Máy chủ liên quan

NotebookLM Web Importer

Nhập trang web và video YouTube vào NotebookLM chỉ với một cú nhấp. Được tin dùng bởi hơn 200.000 người dùng.

Cài đặt tiện ích Chrome