azure-eventhub-py
作者: microsoft
高吞吐量事件擷取的大數據串流平台。
npx skills add https://github.com/microsoft/skills --skill azure-eventhub-pyAzure Event Hubs SDK for Python
Big data streaming platform for high-throughput event ingestion.
Installation
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
Environment Variables
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net # Required for all auth methods
EVENT_HUB_NAME=my-eventhub # Required for all auth methods
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net # Required for checkpoint storage
CHECKPOINT_CONTAINER=checkpoints # Required for checkpoint storage
AZURE_TOKEN_CREDENTIALS=prod # Required only if DefaultAzureCredential is used in production
Authentication & Lifecycle
🔑 Two rules apply to every code sample below:
- Prefer
DefaultAzureCredential. It works locally (Azure CLI / VS Code / Developer CLI) and in Azure (managed identity, workload identity) with no code change. Avoid connection strings, account/API keys — they bypass Entra audit and rotation.
- Local dev:
DefaultAzureCredentialworks as-is.- Production: set
AZURE_TOKEN_CREDENTIALS=prod(orAZURE_TOKEN_CREDENTIALS=<specific_credential>) to constrain the credential chain to production-safe credentials.- Wrap every client in a context manager so HTTP transports, sockets, and token caches are released deterministically:
- Sync:
with <Client>(...) as client:- Async:
async with <Client>(...) as client:andasync with DefaultAzureCredential() as credential:(fromazure.identity.aio)Snippets may abbreviate this setup, but production code should always follow both rules.
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
# Local dev: DefaultAzureCredential. Production: set AZURE_TOKEN_CREDENTIALS=prod or AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Or use a specific credential directly in production:
# See https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# Producer
with EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
) as producer:
# Use producer here (see following sections for operations)
...
# Consumer
with EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
) as consumer:
# Use consumer here (see following sections for operations)
...
Client Types
| Client | Purpose |
|---|---|
EventHubProducerClient | Send events to Event Hub |
EventHubConsumerClient | Receive events from Event Hub |
BlobCheckpointStore | Track consumer progress |
Send Events
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
) as producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)
Send to Specific Partition
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")
# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
Receive Events
Simple Receive
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)
With Blob Checkpoint Store (Production)
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
) as consumer:
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
consumer.receive(on_event=on_event)
Async Client
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())
Event Properties
event = EventData("My event body")
# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
Get Event Hub Info
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
Best Practices
- Pick sync OR async and stay consistent. Do not mix
azure.xxxsync clients withazure.xxx.aioasync clients in the same call path. Choose one mode per module. - Always use context managers for clients and async credentials. Wrap every client in
with Client(...) as client:(sync) orasync with Client(...) as client:(async) for proper cleanup. For asyncDefaultAzureCredentialfromazure.identity.aio, also useasync with credential:so tokens and transports are cleaned up. - Use
DefaultAzureCredentialfor portable auth across local dev and Azure (avoid connection strings / API keys when possible). - Use batches for sending multiple events
- Use checkpoint store in production for reliable processing
- Use async client for high-throughput scenarios
- Use partition keys for ordered delivery within a partition
- Handle batch size limits — catch ValueError when batch is full
- Set appropriate consumer groups for different applications
Reference Files
| File | Contents |
|---|---|
| references/checkpointing.md | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| references/partitions.md | Partition management, load balancing, starting positions |
| scripts/setup_consumer.py | CLI for Event Hub info, consumer setup, and event sending/receiving |
來自 microsoft 的更多技能
oss-growth
microsoft
開源增長駭客角色
official
microsoft-foundry
microsoft
端到端部署、評估與管理 Foundry 代理:Docker 建置、ACR 推送、託管/提示代理建立、容器啟動、批次評估、持續評估、提示最佳化工作流程、agent.yaml、從追蹤資料集整理。用途:將代理部署至 Foundry、託管代理、建立代理、調用代理、評估代理、執行批次評估、持續評估、持續監控、持續評估狀態、最佳化提示、改善提示、提示最佳化器、最佳化代理指令、改善代理...
officialdevelopmentdevops
azure-ai
microsoft
用於 Azure AI:搜尋、語音、OpenAI、文件智慧。協助搜尋、向量/混合搜尋、語音轉文字、文字轉語音、轉錄、OCR。適用情境:AI 搜尋、查詢搜尋、向量搜尋、混合搜尋、語意搜尋、語音轉文字、文字轉語音、轉錄、OCR、將文字轉換為語音。
officialdevelopmentapi
azure-deploy
microsoft
對已準備好的應用程式執行 Azure 部署,這些應用程式需具備現有的 .azure/deployment-plan.md 與基礎架構檔案。當使用者要求建立新應用程式時,請勿使用此技能——應改用 azure-prepare。此技能會執行 azd up、azd deploy、terraform apply 及 az deployment 命令,並內建錯誤復原機制。需具備來自 azure-prepare 的 .azure/deployment-plan.md,以及來自 azure-validate 的驗證狀態。適用時機:「執行 azd up」、「執行 azd deploy」、「執行部署」……
officialdevopsaws
azure-storage
microsoft
Azure Storage Services 包括 Blob 儲存體、檔案共用、佇列儲存體、表格儲存體和 Data Lake。回答關於儲存存取層(熱、冷、凍結、封存)、各層使用時機及層級比較的問題。提供物件儲存、SMB 檔案共用、非同步訊息、NoSQL 鍵值及大數據分析。包含生命週期管理。用於:blob 儲存體、檔案共用、佇列儲存體、表格儲存體、data lake、上傳檔案、下載 blob、儲存帳戶、存取層...
officialdevelopmentdatabase
azure-diagnostics
microsoft
在 Azure 上使用 AppLens、Azure Monitor、資源健康狀態和安全分類來偵錯 Azure 生產問題。適用時機:偵錯生產問題、疑難排解應用程式服務、應用程式服務高 CPU、應用程式服務部署失敗、疑難排解容器應用程式、疑難排解函數、疑難排解 AKS、kubectl 無法連線、kube-system/CoreDNS 失敗、Pod 擱置、CrashLoop、節點未就緒、升級失敗、分析記錄、KQL、深入解析、映像提取失敗、冷啟動問題、健康狀態探查失敗...
officialdevopsdevelopment
azure-prepare
microsoft
準備 Azure 應用程式以進行部署(基礎架構 Bicep/Terraform、azure.yaml、Dockerfile)。用於建立/現代化或建立+部署;不適用於跨雲端遷移(請使用 azure-cloud-migrate)。請勿用於:copilot-sdk 應用程式(請使用 azure-hosted-copilot-sdk)。適用時機:「建立應用程式」、「建置 Web 應用程式」、「建立 API」、「建立無伺服器 HTTP API」、「建立前端」、「建立後端」、「建置服務」、「現代化應用程式」、「更新應用程式」、「新增驗證」、「新增快取」、「託管於 Azure」、「建立並...」
officialdevelopmentdevops
azure-validate
microsoft
部署前驗證 Azure 就緒狀態。對設定、基礎架構(Bicep 或 Terraform)、RBAC 角色指派、受控身分權限及先決條件進行深度檢查,再進行部署。適用時機:驗證我的應用程式、檢查部署就緒狀態、執行預檢檢查、驗證設定、確認是否可部署、驗證 azure.yaml、驗證 Bicep、部署前測試、疑難排解部署錯誤、驗證 Azure Functions、驗證函式應用程式、驗證無伺服器...
officialdevopstesting