azure-servicebus-py
作者: microsoft
適用於佇列與發佈/訂閱主題的企業級訊息傳遞,實現可靠的雲端通訊。
npx skills add https://github.com/microsoft/skills --skill azure-servicebus-pyAzure Service Bus SDK for Python
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
Installation
pip install azure-servicebus azure-identity
Environment Variables
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net # Required for all auth methods
SERVICEBUS_QUEUE_NAME=myqueue # Required for queue operations
SERVICEBUS_TOPIC_NAME=mytopic # Required for topic operations
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription # Required for subscription operations
AZURE_TOKEN_CREDENTIALS=prod # Required only if DefaultAzureCredential is used in production
Authentication & Lifecycle
🔑 Two rules apply to every code sample below:
- Prefer
DefaultAzureCredential. It works locally (Azure CLI / VS Code / Developer CLI) and in Azure (managed identity, workload identity) with no code change. Avoid connection strings, account/API keys — they bypass Entra audit and rotation.
- Local dev:
DefaultAzureCredentialworks as-is.- Production: set
AZURE_TOKEN_CREDENTIALS=prod(orAZURE_TOKEN_CREDENTIALS=<specific_credential>) to constrain the credential chain to production-safe credentials.- Wrap every client in a context manager so HTTP transports, sockets, and token caches are released deterministically:
- Sync:
with <Client>(...) as client:- Async:
async with <Client>(...) as client:andasync with DefaultAzureCredential() as credential:(fromazure.identity.aio)Snippets may abbreviate this setup, but production code should always follow both rules.
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.servicebus import ServiceBusClient
# Local dev: DefaultAzureCredential. Production: set AZURE_TOKEN_CREDENTIALS=prod or AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Or use a specific credential directly in production:
# See https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"
with ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
) as client:
# Use client here (see following sections for operations)
...
Client Types
| Client | Purpose | Get From |
|---|---|---|
ServiceBusClient | Connection management | Direct instantiation |
ServiceBusSender | Send messages | client.get_queue_sender() / get_topic_sender() |
ServiceBusReceiver | Receive messages | client.get_queue_receiver() / get_subscription_receiver() |
Send Messages (Async)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
Receive Messages (Async)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
Receive Modes
| Mode | Behavior | Use Case |
|---|---|---|
PEEK_LOCK (default) | Message locked, must complete/abandon | Reliable processing |
RECEIVE_AND_DELETE | Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
Message Settlement
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action | Effect |
|---|---|
complete_message() | Remove from queue (success) |
abandon_message() | Release lock, retry immediately |
dead_letter_message() | Move to dead-letter queue |
defer_message() | Set aside, receive by sequence number |
Topics and Subscriptions
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
Scheduled Messages
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
Dead-Letter Queue
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
Sync Client (for simple scripts)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Best Practices
- Pick sync OR async and stay consistent. Do not mix
azure.xxxsync clients withazure.xxx.aioasync clients in the same call path. Choose one mode per module. - Always use context managers for clients and async credentials. Wrap every client in
with Client(...) as client:(sync) orasync with Client(...) as client:(async) for proper cleanup. For asyncDefaultAzureCredentialfromazure.identity.aio, also useasync with credential:so tokens and transports are cleaned up. - Use
DefaultAzureCredentialfor portable auth across local dev and Azure (avoid connection strings / API keys when possible). - Use async client for production workloads
- Complete messages after successful processing
- Use dead-letter queue for poison messages
- Use sessions for ordered, FIFO processing
- Use message batches for high-throughput scenarios
- Set
max_wait_timeto avoid infinite blocking
Reference Files
| File | Contents |
|---|---|
| references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions |
| references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies |
| scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
來自 microsoft 的更多技能
oss-growth
microsoft
開源增長駭客角色
official
microsoft-foundry
microsoft
端到端部署、評估與管理 Foundry 代理:Docker 建置、ACR 推送、託管/提示代理建立、容器啟動、批次評估、持續評估、提示最佳化工作流程、agent.yaml、從追蹤資料集整理。用途:將代理部署至 Foundry、託管代理、建立代理、調用代理、評估代理、執行批次評估、持續評估、持續監控、持續評估狀態、最佳化提示、改善提示、提示最佳化器、最佳化代理指令、改善代理...
officialdevelopmentdevops
azure-ai
microsoft
用於 Azure AI:搜尋、語音、OpenAI、文件智慧。協助搜尋、向量/混合搜尋、語音轉文字、文字轉語音、轉錄、OCR。適用情境:AI 搜尋、查詢搜尋、向量搜尋、混合搜尋、語意搜尋、語音轉文字、文字轉語音、轉錄、OCR、將文字轉換為語音。
officialdevelopmentapi
azure-deploy
microsoft
對已準備好的應用程式執行 Azure 部署,這些應用程式需具備現有的 .azure/deployment-plan.md 與基礎架構檔案。當使用者要求建立新應用程式時,請勿使用此技能——應改用 azure-prepare。此技能會執行 azd up、azd deploy、terraform apply 及 az deployment 命令,並內建錯誤復原機制。需具備來自 azure-prepare 的 .azure/deployment-plan.md,以及來自 azure-validate 的驗證狀態。適用時機:「執行 azd up」、「執行 azd deploy」、「執行部署」……
officialdevopsaws
azure-storage
microsoft
Azure Storage Services 包括 Blob 儲存體、檔案共用、佇列儲存體、表格儲存體和 Data Lake。回答關於儲存存取層(熱、冷、凍結、封存)、各層使用時機及層級比較的問題。提供物件儲存、SMB 檔案共用、非同步訊息、NoSQL 鍵值及大數據分析。包含生命週期管理。用於:blob 儲存體、檔案共用、佇列儲存體、表格儲存體、data lake、上傳檔案、下載 blob、儲存帳戶、存取層...
officialdevelopmentdatabase
azure-diagnostics
microsoft
在 Azure 上使用 AppLens、Azure Monitor、資源健康狀態和安全分類來偵錯 Azure 生產問題。適用時機:偵錯生產問題、疑難排解應用程式服務、應用程式服務高 CPU、應用程式服務部署失敗、疑難排解容器應用程式、疑難排解函數、疑難排解 AKS、kubectl 無法連線、kube-system/CoreDNS 失敗、Pod 擱置、CrashLoop、節點未就緒、升級失敗、分析記錄、KQL、深入解析、映像提取失敗、冷啟動問題、健康狀態探查失敗...
officialdevopsdevelopment
azure-prepare
microsoft
準備 Azure 應用程式以進行部署(基礎架構 Bicep/Terraform、azure.yaml、Dockerfile)。用於建立/現代化或建立+部署;不適用於跨雲端遷移(請使用 azure-cloud-migrate)。請勿用於:copilot-sdk 應用程式(請使用 azure-hosted-copilot-sdk)。適用時機:「建立應用程式」、「建置 Web 應用程式」、「建立 API」、「建立無伺服器 HTTP API」、「建立前端」、「建立後端」、「建置服務」、「現代化應用程式」、「更新應用程式」、「新增驗證」、「新增快取」、「託管於 Azure」、「建立並...」
officialdevelopmentdevops
azure-validate
microsoft
部署前驗證 Azure 就緒狀態。對設定、基礎架構(Bicep 或 Terraform)、RBAC 角色指派、受控身分權限及先決條件進行深度檢查,再進行部署。適用時機:驗證我的應用程式、檢查部署就緒狀態、執行預檢檢查、驗證設定、確認是否可部署、驗證 azure.yaml、驗證 Bicep、部署前測試、疑難排解部署錯誤、驗證 Azure Functions、驗證函式應用程式、驗證無伺服器...
officialdevopstesting