Skip to main content

Different methods to implement Message Aggregator pattern using Service Bus Topic – CorrelationId

In one of my last post I presented the Aggregator Pattern. The main purpose of this pattern is the ability of the consumer to combine (aggregate) messages. This pattern can be implemented in Windows Azure using Windows Azure Service Bus Queues or Topics.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Stream messageStream = message.GetBody<Stream>();
for (int offset = 0;
    offset < length;
    offset += 255)
        {                     
            long messageLength = (length - offset) > 255
                ? 255
        : length - offset;
            byte[] currentMessageContent = new byte[messageLength];
            int result = messageStream.Read(currentMessageContent, 0, (int)messageLength);
            BrokeredMessage currentMessage = new BrokeredMessage(
        new MemoryStream(currentMessageContent),
        true);
            subMessage.SessionId = currentContentId;
            qc.Send(currentMessage);
        }

Consumer

MemoryStream finalStream = new MemoryStream();
MessageSession messageSession = qc.AcceptMessageSession(mySessionId);
while(true)
{
    BrokeredMessage message = messageSession.Receive(TimeSpan.FromSeconds(20));
    if(message != null)
    {
                message.GetBody<Stream>().CopyTo(finalStream);
                message.Complete();
                continue;
    }
   break;
}
In the above example we are spitting a stream in small parts and sending it on the Service Bus.
But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.

Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
BrokeredMessage message = new BrokeredMessage();
message.Properties["NewCorrelationId"] = 1;
message.Properties["Content"] = "new available cars for rent";
topicClient.Send(message);
Thread.Sleep(10000);
-creating the subscription that check if the message contains the property that specify the correlation id.
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToNewCorrelationIds”,
     new SqlFilterExpression("EXISTS(NewCorrelationId”));
-create the consumer that processes the message, by creating a new subscription.
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToNewCorrelationIds”);
BrokeredMessage correlationIdMessage = client.Receive();
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToMyCorrelationId”,
     new CorrelationFilterExpression(correlationIdMessage.Properties["NewCorrelationId"]));
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToMyCorrelationId);

... client.Receive() …
From performance perspective is would be better to use a different topic to send the messages that contains the new correlation id.
We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part. 
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(...

How to audit an Azure Cosmos DB

In this post, we will talk about how we can audit an Azure Cosmos DB database. Before jumping into the problem let us define the business requirement: As an Administrator I want to be able to audit all changes that were done to specific collection inside my Azure Cosmos DB. The requirement is simple, but can be a little tricky to implement fully. First of all when you are using Azure Cosmos DB or any other storage solution there are 99% odds that you’ll have more than one system that writes data to it. This means that you have or not have control on the systems that are doing any create/update/delete operations. Solution 1: Diagnostic Logs Cosmos DB allows us activate diagnostics logs and stream the output a storage account for achieving to other systems like Event Hub or Log Analytics. This would allow us to have information related to who, when, what, response code and how the access operation to our Cosmos DB was done. Beside this there is a field that specifies what was th...

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills)

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills) The idea that moving to the cloud reduces the costs is a common misconception. The cloud infrastructure provides flexibility, scalability, and better CAPEX, but it does not guarantee lower costs without proper optimisation and management of the cloud services and infrastructure. Idle and unused resources, overprovisioning, oversize databases, and unnecessary data transfer can increase running costs. The regional pricing mode, multi-cloud complexity, and cost variety add extra complexity to the cost function. Cloud adoption without a cost governance strategy can result in unexpected expenses. Improper usage, combined with a pay-as-you-go model, can result in a nightmare for business stakeholders who cannot track and manage the monthly costs. Cloud-native services such as AI services, managed databases, and analytics platforms are powerful, provide out-of-the-shelve capabilities, and increase business agility and innovation. H...