In one of my last post I presented the Aggregator Pattern. The main purpose of this pattern is the ability of the consumer to combine (aggregate) messages. This pattern can be implemented in Windows Azure using Windows Azure Service Bus Queues or Topics.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Consumer
But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.
Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part.
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Stream messageStream = message.GetBody<Stream>();
for (int offset = 0;
offset < length;
offset += 255)
{
long messageLength = (length - offset) > 255
? 255
: length - offset;
byte[] currentMessageContent = new byte[messageLength];
int result = messageStream.Read(currentMessageContent, 0, (int)messageLength);
BrokeredMessage currentMessage = new BrokeredMessage(
new MemoryStream(currentMessageContent),
true);
subMessage.SessionId = currentContentId;
qc.Send(currentMessage);
}
Consumer
MemoryStream finalStream = new MemoryStream();
MessageSession messageSession = qc.AcceptMessageSession(mySessionId);
while(true)
{
BrokeredMessage message = messageSession.Receive(TimeSpan.FromSeconds(20));
if(message != null)
{
message.GetBody<Stream>().CopyTo(finalStream);
message.Complete();
continue;
}
break;
}
In the above example we are spitting a stream in small parts and sending it on the Service Bus.But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.
Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
BrokeredMessage message = new BrokeredMessage();
message.Properties["NewCorrelationId"] = 1;
message.Properties["Content"] = "new available cars for rent";
topicClient.Send(message);
Thread.Sleep(10000);
-creating the subscription that check if the message contains the property that specify the correlation id.namespaceManager.CreateSubscription(
“myTopic”,
“listenToNewCorrelationIds”,
new SqlFilterExpression("EXISTS(NewCorrelationId”));
-create the consumer that processes the message, by creating a new subscription.SubscriptionClient client =
SubscriptionClient.CreateFromConnectionString(
connectionString,
"myTopic",
“listenToNewCorrelationIds”);
BrokeredMessage correlationIdMessage = client.Receive();
namespaceManager.CreateSubscription(
“myTopic”,
“listenToMyCorrelationId”,
new CorrelationFilterExpression(correlationIdMessage.Properties["NewCorrelationId"]));
SubscriptionClient client =
SubscriptionClient.CreateFromConnectionString(
connectionString,
"myTopic",
“listenToMyCorrelationId);
... client.Receive() …
From performance perspective is would be better to use a different topic to send the messages that contains the new correlation id.We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part.
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.
Comments
Post a Comment