Skip to main content

Different methods to implement Message Aggregator pattern using Service Bus Topic – CorrelationId

In one of my last post I presented the Aggregator Pattern. The main purpose of this pattern is the ability of the consumer to combine (aggregate) messages. This pattern can be implemented in Windows Azure using Windows Azure Service Bus Queues or Topics.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Stream messageStream = message.GetBody<Stream>();
for (int offset = 0;
    offset < length;
    offset += 255)
        {                     
            long messageLength = (length - offset) > 255
                ? 255
        : length - offset;
            byte[] currentMessageContent = new byte[messageLength];
            int result = messageStream.Read(currentMessageContent, 0, (int)messageLength);
            BrokeredMessage currentMessage = new BrokeredMessage(
        new MemoryStream(currentMessageContent),
        true);
            subMessage.SessionId = currentContentId;
            qc.Send(currentMessage);
        }

Consumer

MemoryStream finalStream = new MemoryStream();
MessageSession messageSession = qc.AcceptMessageSession(mySessionId);
while(true)
{
    BrokeredMessage message = messageSession.Receive(TimeSpan.FromSeconds(20));
    if(message != null)
    {
                message.GetBody<Stream>().CopyTo(finalStream);
                message.Complete();
                continue;
    }
   break;
}
In the above example we are spitting a stream in small parts and sending it on the Service Bus.
But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.

Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
BrokeredMessage message = new BrokeredMessage();
message.Properties["NewCorrelationId"] = 1;
message.Properties["Content"] = "new available cars for rent";
topicClient.Send(message);
Thread.Sleep(10000);
-creating the subscription that check if the message contains the property that specify the correlation id.
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToNewCorrelationIds”,
     new SqlFilterExpression("EXISTS(NewCorrelationId”));
-create the consumer that processes the message, by creating a new subscription.
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToNewCorrelationIds”);
BrokeredMessage correlationIdMessage = client.Receive();
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToMyCorrelationId”,
     new CorrelationFilterExpression(correlationIdMessage.Properties["NewCorrelationId"]));
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToMyCorrelationId);

... client.Receive() …
From performance perspective is would be better to use a different topic to send the messages that contains the new correlation id.
We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part. 
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine: threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See http://go.microsoft.com/fwlink/?LinkId=260882 for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration: TeamCity .NET 4.51 EF 6.0.2 VS2013 It see

Navigating Cloud Strategy after Azure Central US Region Outage

 Looking back, July 19, 2024, was challenging for customers using Microsoft Azure or Windows machines. Two major outages affected customers using CrowdStrike Falcon or Microsoft Azure computation resources in the Central US. These two outages affected many people and put many businesses on pause for a few hours or even days. The overlap of these two issues was a nightmare for travellers. In addition to blue screens in the airport terminals, they could not get additional information from the airport website, airline personnel, or the support line because they were affected by the outage in the Central US region or the CrowdStrike outage.   But what happened in reality? A faulty CrowdStrike update affected Windows computers globally, from airports and healthcare to small businesses, affecting over 8.5m computers. Even if the Falson Sensor software defect was identified and a fix deployed shortly after, the recovery took longer. In parallel with CrowdStrike, Microsoft provided a too