EIP – Competing Consumer Enterprise Integration Pattern for faster data processing
Summary
In this post I’ll demonstrate how to design an integration architecture that uses the tried and tested integration design pattern Competing Consumer.
Competing Consumer
The Competing Consumer integration pattern works by running the same consumer process multiple times in order to process messages concurrently. This means messages being sent by a Producer to a queue are processed much faster. In reality, you also don’t need a Queueing technology either, A Producer might be generating messages faster than a Consumer can handle them so you can run the same Consumer endpoint application either in parallel (see Async/Await) or run the same Consumer endpoint multiple times in a API Gateway architecture.
This solves the problem of ‘bottlenecking’ where a single Consumer consuming messages at a rate of 100 messages per hour can offload some of that consumption task to another ‘competing Consumer’. Theoretically halving the time. Add a few more consumers, and you can reduce that consumption time even further.
You don’t have to use a message broker either; however like the other integration patterns you should consider adding resilience to the design.
Some benefits:
- Load balancing – messages entering the queue can be picked up by a consumer that is free.
- Scalability – the same consumer process can be instantiated by scaling-up or down depending on load.
- Performance – data is consumed at a faster rate than would otherwise be possible.
Design considerations:
- Consumers process messages, if and when they are free. This means that there is potential for the ordering of messages to become disordered. If you need to maintain sequence, consider combining this strategy with a Sidecar pattern.
- Consumers processing a message with an error will also cause the ordering of messages to become fractured. Some form of monitoring/logging strategy will need to be used.
- The tasks that are created after the consumption of messages must also be independent. They should not rely on the other messages being sent by the Producer.
- The Producer and Consumer should be using asynchronous endpoints in order for competing consumer to really work.
- If a Consumer fails to consume a message, the message should be returned to the queue so that another Consumer can pickup the message.
- The aim should be to create an equilibrium; the number of messages being submitted to a channel, should be equally consumed by Receivers (Consumers).
- Some integration tools allow Competing Consumers to occur natively through some form of parallel and concurrent processing. With Azure Logic Apps and Power Automate Flows, the number of parallel processes can be increased or decreased.
Take the following scenario – during the holiday period, there may be a heavy increase in customer purchases. A retail owner needs to ensure that products purchased are removed from stock, and the website updated to reflect this change.
Conceptual Architecture
One thousand messages will be sent to a Message Broker by a Source System. These messages will be consumed by a Target Application. In the conceptual implementation, Azure Functions will act as the Message Sender, Azure Service Bus queues will be used as a Message Broker/Channel and the Receiver Endpoint will be a .NET Console application.
In this conceptual integration, the aim will be to remove the bottleneck that has occurred in the Messaging Channel. The snapshot below shows what we’ll be aiming form (Single Consumer vs. Competing Consumer).
Conceptual Architecture with Azure Functions
With Microsoft Azure, some implementation design work is automatically abstracted away so you don’t need to deal with designing solutions for these components. For instance Azure Service Bus uses the AMPQ 1.0 protocol as its primary means of communication.
AMQP enables you to build cross-platform, hybrid applications using a vendor-neutral and implementation-neutral, open standard protocol. You can construct applications using components that are built using different languages and frameworks, and that run on different operating systems. All these components can connect to Service Bus and seamlessly exchange structured business messages efficiently and at full fidelity.
- Source System (Sender): Azure Functions
- Message Construction: AMPQ with content-type, correlation-id, subject, message-id, reply-to, to. Read more about AMPQ here.
- Sender Endpoint: Azure Functions abstracted service.
- Message Channel: Publish-Subscribe.
- Message Routing: Message Broker.
- Target System (Receiver): Azure Functions
- Receiver Endpoint: Azure Functions abstracted service endpoint.
- Receiver Message Transformation: Azure Functions, abstracted service endpoint.
- Systems Management: Azure Application Insights.
*Note that there is often overlap between Microsoft technologies. Therefore in some cases, a choreography tool like Azure Logic Apps could orchestrate all nine points mentioned above.
C# Implementation of Competing Consumer
This is quite a simple project, therefore I haven’t included a detailed set of instructions for getting this up and running. The Github source can be found here:
Create the required Azure resources
az group create --name RG100 --location uksouth
az servicebus namespace create --resource-group RG100 --name CompetingConsumer --location uksouth
az servicebus queue create --resource-group RG100 --namespace-name CompetingConsumer --name OrderQueue
az servicebus namespace authorization-rule keys list --resource-group RG100 --namespace-name CompetingConsumer --name RootManageSharedAccessKey --query primaryConnectionString --output tsv
Dependency Injection
This is a simple project, however I always use dependency injection to decouple my services. The Startup.cs class has a Singleton which calls the MessageConstruction method.
Startup.cs
using CompetingConsumer.Services;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
[assembly: FunctionsStartup(typeof(CompetingConsumer.Startup))]
namespace CompetingConsumer
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<IMessageConstruction, MessageConstruction>();
}
}
}
Azure Functions Class
The Functions class calls three other methods: OrderApp, MessageTransform and the CreateSendMessage methods.
using CompetingConsumer.Services;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
namespace CompetingConsumer
{
public class AzureFunction
{
private readonly IMessageConstruction _message;
public AzureFunction(IMessageConstruction createMessage) => this._message = createMessage;
[FunctionName("Sendtobus")]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req, ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
try
{
// Create a new Order
OrderApp o = new OrderApp();
o.CreateOrders();
// Apply Transformation
MessageTransform m = new MessageTransform();
// Send messages asynchronously
await _message.CreateSendMessage(m);
log.LogInformation("File completed processing");
}
catch (Exception e)
{
log.LogError(e.ToString());
}
return null;
}
}
}
MessageConstruction.cs
This class is responsible for creating the messages and sending the messages to the Service Broker (Azure Service Bus). The message is formatted to AMPQ 1.0.
using Azure.Messaging.ServiceBus;
using Serilog;
using System;
using System.Text;
using System.Threading.Tasks;
namespace CompetingConsumer.Services
{
internal class MessageConstruction : IMessageConstruction
{
private static string connectionString = Environment.GetEnvironmentVariable("ConnectionString");
private static string queueName = Environment.GetEnvironmentVariable("queue");
private static ServiceBusClient client;
private static ServiceBusSender sender;
public async Task CreateSendMessage(MessageTransform m)
{
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(queueName);
// Clear existing Queueu
ClearServiceBus().Wait();
// Execute 1000000 messages
int i = 0;
do
{
ServiceBusMessage message = new ServiceBusMessage()
{
MessageId = Guid.NewGuid().ToString(), // Unique ide for this message
CorrelationId = $"OrderApp-{Guid.NewGuid()}", // ID used to correlat the message back to the sender if required
ContentType = "application/json",
Subject = $"Order id: {i}",
ReplyTo = "orderapp-100",
Body = BinaryData.FromString($"{m.MessageTransformation()}")
};
try
{
await sender.SendMessageAsync(message);
}
catch (Exception e)
{
}
i++;
} while (i < 1000000);
// Close connection
await sender.DisposeAsync();
await client.DisposeAsync();
}
public static async Task ClearServiceBus()
{
ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
while ((await receiver.PeekMessageAsync()) != null)
{
// receive in batches of 100 messages.
await receiver.ReceiveMessagesAsync(100);
}
}
}
}
Receiver.cs
The Receiver applications processes messages incrementally. Peeklock is enabled ensuring that Receivers do not pickup the same message from the Queue.
using System;
using System.Configuration;
using System.Diagnostics;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Configuration;
namespace QueueReceiver
{
internal class Program
{
private static string connectionString = ConfigurationManager.AppSettings.Get("connection");
private static string queueName = ConfigurationManager.AppSettings.Get("queue");
private static ServiceBusClient client;
private static ServiceBusProcessor processor;
private static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
await args.CompleteMessageAsync(args.Message);
}
private static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
private static async Task Main()
{
client = new ServiceBusClient(connectionString);
processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
try
{
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
// Stop and terminate
Console.ReadKey();
await processor.StopProcessingAsync();
}
finally
{
await processor.DisposeAsync();
await client.DisposeAsync();
}
}
}
}
Results
When the Azure Functions is invoked, and Order is created and transformed into a JSON object. In parallel, an AMPQ 1.0 message is constructed and the Order is injected into the Body of the message. The Message includes several header details including MessageId and CorrelationID. The Endpoint (Azure Functions) sends 100 messages to a Queue called OrderQueue. When multiple Receivers (the .NET Console App) are run, messages from the Azure Service Bus are retrieved in parallel and the overall execution time is reduced.
The implementation documented on this page demonstrates the Competing Consumer Integration pattern. The Competing Consumer Integration pattern can also be achieved with any large orchestration or choreography tool such as Azure Data Factory or Azure Logic Apps just by turning on concurrent or parallel runs.
A single Receiver processing 1 million messages:
6 Receivers processing 1 million messages