Azure Service Bus Message Settlement
Post by: syed hussain in All Microsoft Azure Reference Architecture
Summary
When sending a message to Azure Service Bus there are various ways to settle a message. This post discusses these methods.
Message Settlement
- When sending a message to the Azure Service Bus, if an error is not received as a response, it can be assumed that the delivery of the message was a success.
- A Receiver can pull a message from the Service Bus channel with a Receive & Delete method. This is a destructive read and suitable for scenarios where the consequence of a message being lost does not matter. In most scenarios, the Receive & Delete method will not be used.
- When instantiating a new Receiver, the default consumption method is the PeekLock. The PeekLock guarantees that no other Receiver will consume the message within a given period.
- After a message has been delivered to the Receiver, the Receiver can choose to Abandon a message. This might be for simple reasons such as the Receiver querying an endpoint that is not receptive. The Receiver in this instance may choose to abandon the message and move on to the following message in the queue.
- If a message expiration has been set and the Receiver takes too long to process the message, the message will expire and be sent back to the Service Bus Queue. In situations like this, if the Receiver is in the middle of processing a message then the Receiver must execute the appropriate business logic to deal with processing the same message again (Idempotent receiver).
- The Receiver may choose to send a Complete response back to Azure Service Bus which removes the message from the Queue.
- In some cases, the Receiver can force a message to the Deadletter Queue. This is a special queue designed to remove messages from a Queue so that they can be reviewed manually. In other cases, if a message has been abandoned or has expired multiple times; based on the threshold set, Azure Service Bus may move the message to the Deadletter Queue.
Abandon
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
await receiver.AbandonMessageAsync(message);
Defer
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
await receiver.DeferMessageAsync(message);
Complete
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
await receiver.CompleteMessageAsync(message);
Dead Letter
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
await receiver.DeadLetterMessageAsync(message);
Full Code
using Azure.Messaging.ServiceBus;
namespace AzureServiceBusBasic
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int order = 1;
do
{
_ = await SendReceiveAsync(order);
order++;
} while (true);
}
private static async Task<ServiceBusClient> SendReceiveAsync(int count)
{
string connectionString = "<CONNECTION STRING>";
string queue = "<QUEUE>";
var client = new ServiceBusClient(connectionString);
bool create = true;
int settle = 0;
ServiceBusSender sender = client.CreateSender(queue);
{
if (create == true)
{
string payload = @"'employee':{'name':'sonoo','salary':56000,'married':true}";
ServiceBusMessage message = new()
{
MessageId = Guid.NewGuid().ToString(),
Subject = $"Order {count}",
ContentType = "application/json",
CorrelationId = Guid.NewGuid().ToString(),
TimeToLive = TimeSpan.FromSeconds(600),
Body = BinaryData.FromString($"{payload}")
};
await sender.SendMessageAsync(message);
}
}
ServiceBusReceiver receiver = client.CreateReceiver(queue);
{
// Receive messages in Peeklock
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
// Settle the message randomly
Random rand = new Random();
settle = rand.Next(1, 5);
switch (settle)
{
case 1:
// Abandon
ProcessMessage(message);
await receiver.AbandonMessageAsync(message);
break;
case 2:
// Expire: simulate a 10 minute process
var task = Task.Run(() => ProcessMessage(message));
await task.WaitAsync(TimeSpan.FromSeconds(600));
break;
case 3:
// Dead Letter
ProcessMessage(message);
await receiver.DeadLetterMessageAsync(message);
break;
case 4:
// Defer
ProcessMessage(message);
await receiver.DeferMessageAsync(message);
break;
case 5:
// Complete
ProcessMessage(message);
await receiver.CompleteMessageAsync(message);
break;
default:
// Exit
break;
}
}
return client;
}
private static void ProcessMessage(ServiceBusReceivedMessage message)
{
string body = message.Body.ToString();
}
}
}
Tags: azure service bus service bus