Azure Service Bus Message Settlement

Summary

When sending a message to Azure Service Bus there are various ways to settle a message. This post discusses these methods.

Message Settlement

  1. When sending a message to the Azure Service Bus, if an error is not received as a response, it can be assumed that the delivery of the message was a success.
  2. A Receiver can pull a message from the Service Bus channel with a Receive & Delete method. This is a destructive read and suitable for scenarios where the consequence of a message being lost does not matter. In most scenarios, the Receive & Delete method will not be used.
  3. When instantiating a new Receiver, the default consumption method is the PeekLock. The PeekLock guarantees that no other Receiver will consume the message within a given period.
  4. After a message has been delivered to the Receiver, the Receiver can choose to Abandon a message. This might be for simple reasons such as the Receiver querying an endpoint that is not receptive. The Receiver in this instance may choose to abandon the message and move on to the following message in the queue.
  5. If a message expiration has been set and the Receiver takes too long to process the message, the message will expire and be sent back to the Service Bus Queue. In situations like this, if the Receiver is in the middle of processing a message then the Receiver must execute the appropriate business logic to deal with processing the same message again (Idempotent receiver).
  6. The Receiver may choose to send a Complete response back to Azure Service Bus which removes the message from the Queue.
  7. In some cases, the Receiver can force a message to the Deadletter Queue. This is a special queue designed to remove messages from a Queue so that they can be reviewed manually. In other cases, if a message has been abandoned or has expired multiple times; based on the threshold set, Azure Service Bus may move the message to the Deadletter Queue.

Abandon

 ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

 await receiver.AbandonMessageAsync(message);

Defer

ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

await receiver.DeferMessageAsync(message);

Complete

ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

await receiver.CompleteMessageAsync(message);

Dead Letter

ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

await receiver.DeadLetterMessageAsync(message);

Full Code

using Azure.Messaging.ServiceBus;

namespace AzureServiceBusBasic
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            int order = 1;

            do
            {
                _ = await SendReceiveAsync(order);
                order++;
            } while (true);
        }

        private static async Task<ServiceBusClient> SendReceiveAsync(int count)
        {
            string connectionString = "<CONNECTION STRING>";
            string queue = "<QUEUE>";
            var client = new ServiceBusClient(connectionString);

            bool create = true;
            int settle = 0;

            ServiceBusSender sender = client.CreateSender(queue);
            {
                if (create == true)
                {
                    string payload = @"'employee':{'name':'sonoo','salary':56000,'married':true}";

                    ServiceBusMessage message = new()
                    {
                        Message‚ÄčId = Guid.NewGuid().ToString(),
                        Subject = $"Order {count}",
                        ContentType = "application/json",
                        CorrelationId = Guid.NewGuid().ToString(),
                        TimeToLive = TimeSpan.FromSeconds(600),
                        Body = BinaryData.FromString($"{payload}")
                    };

                    await sender.SendMessageAsync(message);
                }
            }

            ServiceBusReceiver receiver = client.CreateReceiver(queue);
            {
                // Receive messages in Peeklock
                ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

                // Settle the message randomly
                Random rand = new Random();
                settle = rand.Next(1, 5);

                switch (settle)
                {
                    case 1:
                        // Abandon
                        ProcessMessage(message);
                        await receiver.AbandonMessageAsync(message);
                        break;

                    case 2:
                        // Expire: simulate a 10 minute process

                        var task = Task.Run(() => ProcessMessage(message));
                        await task.WaitAsync(TimeSpan.FromSeconds(600));

                        break;

                    case 3:
                        // Dead Letter
                        ProcessMessage(message);
                        await receiver.DeadLetterMessageAsync(message);
                        break;

                    case 4:
                        // Defer
                        ProcessMessage(message);
                        await receiver.DeferMessageAsync(message);
                        break;

                    case 5:
                        // Complete
                        ProcessMessage(message);
                        await receiver.CompleteMessageAsync(message);
                        break;

                    default:
                        // Exit
                        break;
                }
            }

            return client;
        }

        private static void ProcessMessage(ServiceBusReceivedMessage message)
        {
            string body = message.Body.ToString();
        }
    }
}

Leave a comment