azure-servicebus-ts
作者: microsoft
使用 Azure Service Bus 的 JavaScript SDK(@azure/service-bus)构建消息传递应用程序。适用于实现队列、主题/订阅、消息…
npx skills add https://github.com/microsoft/skills --skill azure-servicebus-tsAzure Service Bus SDK for TypeScript
Enterprise messaging with queues, topics, and subscriptions.
Installation
npm install @azure/service-bus @azure/identity
Environment Variables
SERVICEBUS_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=my-queue
SERVICEBUS_TOPIC_NAME=my-topic
SERVICEBUS_SUBSCRIPTION_NAME=my-subscription
AZURE_TOKEN_CREDENTIALS=prod # Required only if DefaultAzureCredential is used in production
Authentication
import { ServiceBusClient } from "@azure/service-bus";
import { DefaultAzureCredential, ManagedIdentityCredential } from "@azure/identity";
// Local dev: DefaultAzureCredential. Production: set AZURE_TOKEN_CREDENTIALS=prod or AZURE_TOKEN_CREDENTIALS=<specific_credential>
const credential = new DefaultAzureCredential({requiredEnvVars: ["AZURE_TOKEN_CREDENTIALS"]});
// Or use a specific credential directly in production:
// See https://learn.microsoft.com/javascript/api/overview/azure/identity-readme?view=azure-node-latest#credential-classes
// const credential = new ManagedIdentityCredential();
const fullyQualifiedNamespace = process.env.SERVICEBUS_NAMESPACE!;
const client = new ServiceBusClient(fullyQualifiedNamespace, credential);
Core Workflow
Send Messages to Queue
const sender = client.createSender("my-queue");
// Single message
await sender.sendMessages({
body: { orderId: "12345", amount: 99.99 },
contentType: "application/json",
});
// Batch messages
const batch = await sender.createMessageBatch();
batch.tryAddMessage({ body: "Message 1" });
batch.tryAddMessage({ body: "Message 2" });
await sender.sendMessages(batch);
await sender.close();
Receive Messages from Queue
const receiver = client.createReceiver("my-queue");
// Receive batch
const messages = await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 });
for (const message of messages) {
console.log(`Received: ${message.body}`);
await receiver.completeMessage(message);
}
await receiver.close();
Subscribe to Messages (Event-Driven)
const receiver = client.createReceiver("my-queue");
const subscription = receiver.subscribe({
processMessage: async (message) => {
console.log(`Processing: ${message.body}`);
// Message auto-completed on success
},
processError: async (args) => {
console.error(`Error: ${args.error}`);
},
});
// Stop after some time
setTimeout(async () => {
await subscription.close();
await receiver.close();
}, 60000);
Topics and Subscriptions
// Send to topic
const topicSender = client.createSender("my-topic");
await topicSender.sendMessages({
body: { event: "order.created", data: { orderId: "123" } },
applicationProperties: { eventType: "order.created" },
});
// Receive from subscription
const subscriptionReceiver = client.createReceiver("my-topic", "my-subscription");
const messages = await subscriptionReceiver.receiveMessages(10);
Message Sessions
// Send session message
const sender = client.createSender("session-queue");
await sender.sendMessages({
body: { step: 1, data: "First step" },
sessionId: "workflow-123",
});
// Receive session messages
const sessionReceiver = await client.acceptSession("session-queue", "workflow-123");
const messages = await sessionReceiver.receiveMessages(10);
// Get/set session state
const state = await sessionReceiver.getSessionState();
await sessionReceiver.setSessionState(Buffer.from(JSON.stringify({ progress: 50 })));
await sessionReceiver.close();
Dead-Letter Handling
// Move to dead-letter
await receiver.deadLetterMessage(message, {
deadLetterReason: "Validation failed",
deadLetterErrorDescription: "Missing required field: orderId",
});
// Process dead-letter queue
const dlqReceiver = client.createReceiver("my-queue", { subQueueType: "deadLetter" });
const dlqMessages = await dlqReceiver.receiveMessages(10);
for (const msg of dlqMessages) {
console.log(`DLQ Reason: ${msg.deadLetterReason}`);
// Reprocess or log
await dlqReceiver.completeMessage(msg);
}
Scheduled Messages
const sender = client.createSender("my-queue");
// Schedule for future delivery
const scheduledTime = new Date(Date.now() + 60000); // 1 minute from now
const sequenceNumber = await sender.scheduleMessages(
{ body: "Delayed message" },
scheduledTime
);
// Cancel scheduled message
await sender.cancelScheduledMessages(sequenceNumber);
Message Deferral
// Defer message for later
await receiver.deferMessage(message);
// Receive deferred message by sequence number
const deferredMessage = await receiver.receiveDeferredMessages(message.sequenceNumber!);
await receiver.completeMessage(deferredMessage[0]);
Peek Messages (Non-Destructive)
const receiver = client.createReceiver("my-queue");
// Peek without removing
const peekedMessages = await receiver.peekMessages(10);
for (const msg of peekedMessages) {
console.log(`Peeked: ${msg.body}`);
}
Key Types
import {
ServiceBusClient,
ServiceBusSender,
ServiceBusReceiver,
ServiceBusSessionReceiver,
ServiceBusMessage,
ServiceBusReceivedMessage,
ProcessMessageCallback,
ProcessErrorCallback,
} from "@azure/service-bus";
Receive Modes
// Peek-Lock (default) - message locked until completed/abandoned
const receiver = client.createReceiver("my-queue", { receiveMode: "peekLock" });
await receiver.completeMessage(message); // Remove from queue
await receiver.abandonMessage(message); // Return to queue
await receiver.deferMessage(message); // Defer for later
await receiver.deadLetterMessage(message); // Move to DLQ
// Receive-and-Delete - message removed immediately
const receiver = client.createReceiver("my-queue", { receiveMode: "receiveAndDelete" });
Best Practices
- Use Microsoft Entra Token Credential - Use
DefaultAzureCredentialfor local development; useManagedIdentityCredentialorWorkloadIdentityCredentialfor production - Reuse clients - Create
ServiceBusClientonce, share across senders/receivers - Close resources - Always close senders/receivers when done
- Handle errors - Implement
processErrorcallback for subscription receivers - Use sessions for ordering - When message order matters within a group
- Configure dead-letter - Always handle DLQ messages
- Batch sends - Use
createMessageBatch()for multiple messages
Reference Documentation
For detailed patterns, see:
- Queues vs Topics Patterns - Queue/topic patterns, sessions, receive modes, message settlement
- Error Handling and Reliability - ServiceBusError codes, DLQ handling, lock renewal, graceful shutdown
来自 microsoft 的更多技能
oss-growth
microsoft
OSS增长黑客角色
official
microsoft-foundry
microsoft
端到端部署、评估和管理Foundry代理:Docker构建、ACR推送、托管/提示代理创建、容器启动、批量评估、持续评估、提示优化工作流、agent.yaml、从追踪中整理数据集。用途:将代理部署到Foundry、托管代理、创建代理、调用代理、评估代理、运行批量评估、持续评估、持续监控、持续评估状态、优化提示、改进提示、提示优化器、优化代理指令、改进代理...
officialdevelopmentdevops
azure-ai
microsoft
用于Azure AI:搜索、语音、OpenAI、文档智能。支持搜索、向量/混合搜索、语音转文字、文字转语音、转录、OCR。适用场景:AI搜索、查询搜索、向量搜索、混合搜索、语义搜索、语音转文字、文字转语音、转录、OCR、文字转语音。
officialdevelopmentapi
azure-deploy
microsoft
对已准备好的应用程序执行Azure部署,这些程序需包含现有的.azure/deployment-plan.md和基础设施文件。当用户要求创建新应用程序时,请勿使用此技能——应改用azure-prepare。此技能运行azd up、azd deploy、terraform apply和az deployment命令,并内置错误恢复机制。需要来自azure-prepare的.azure/deployment-plan.md以及来自azure-validate的已验证状态。适用场景:"运行azd up"、"运行azd deploy"、"执行部署"...
officialdevopsaws
azure-storage
microsoft
Azure存储服务,包括Blob存储、文件共享、队列存储、表存储和Data Lake。解答关于存储访问层(热、冷、冷、归档)的问题,说明各层的使用场景及对比。提供对象存储、SMB文件共享、异步消息传递、NoSQL键值存储和大数据分析。包含生命周期管理。用途:Blob存储、文件共享、队列存储、表存储、Data Lake、上传文件、下载Blob、存储账户、访问层等。
officialdevelopmentdatabase
azure-diagnostics
microsoft
使用AppLens、Azure Monitor、资源健康和安全分类调试Azure生产问题。适用场景:调试生产问题、排查应用服务、应用服务CPU过高、应用服务部署失败、排查容器应用、排查函数、排查AKS、kubectl无法连接、kube-system/CoreDNS故障、Pod挂起、CrashLoop、节点未就绪、升级失败、分析日志、KQL、洞察、镜像拉取失败、冷启动问题、健康探测失败……
officialdevopsdevelopment
azure-prepare
microsoft
为Azure应用准备部署(基础设施Bicep/Terraform、azure.yaml、Dockerfile)。用于创建/现代化或创建+部署;不用于跨云迁移(使用azure-cloud-migrate)。请勿用于:copilot-sdk应用(使用azure-hosted-copilot-sdk)。适用场景:"创建应用"、"构建Web应用"、"创建API"、"创建无服务器HTTP API"、"创建前端"、"创建后端"、"构建服务"、"现代化应用"、"更新应用"、"添加身份验证"、"添加缓存"、"托管在Azure上"、"创建并...
officialdevelopmentdevops
azure-validate
microsoft
部署前对Azure就绪状态进行验证。对配置、基础设施(Bicep或Terraform)、RBAC角色分配、托管标识权限及先决条件进行深度检查,然后再部署。适用场景:验证我的应用、检查部署就绪状态、运行预检、验证配置、检查是否可部署、验证azure.yaml、验证Bicep、部署前测试、排查部署错误、验证Azure Functions、验证函数应用、验证无服务器...
officialdevopstesting