Skip to main content

Event-Driven message consumtion (aka pump model) - Windows Azure Service Bus

The new version of Windows Azure SDK was launched. I’m glad to see that new feature of Service Bus are available. For me, one of the most important features that was added to Service Bus is the one that give us the possibility to consume messages from a topic or queue using an event-driven model – “pump model”.
Until now, we had to have an infinite loop which check if a new message is available. This simple solution was good, but it was very primitive. In a real production application, a new message would trigger an event that would execute a specific code.
To be able to support something like this, the user would need to create an infinite loop, check if a new message is available and trigger a specific event. This infinite pooling creates a lot of problems, especially when something would go wrong – detecting an error and managing it.
while (true)
{
try
{
  if (isNotStopped)
  {
    break;
  }

  BrokeredMessage message = null;
  message = queueClient.Receive();

  if (mesage == null)
  {
    // No message available;
    continue;
  }

  Process(message);
  message.Complete();
}
catch (Exception ex)
{
  ...
}
}
With the new SDK, we don’t need to do this anymore. We can remote the infinite loop from our code and switch to something more elegant.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = true,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
    },
  messageOptions);
As we can see, we need to specify an event action that will be executing when a new message is available. The parameter or this event is the message itself – BrokeredMessage. This message can be process in any way we want.
In the code, beside the message arrived event I think that you notice the exception received event. This event will be called when an error occurs during the message processing. For example, if during the message arrive event an error occurs, the error will trigger the exception received event.
It is important to know that you should not catch errors in the message received event. Doing something like this will not notify the Windows Azure SDK that something went wrong during the message processing. Because of this, in the case we want to consume message in the peek and lock pattern (and not remove messages that could not be processed with success) we will end up with an expected behavior - removing the messages that were not processed with success. This behavior will appear because we catch all the exception in the message arrive event and the SDK will not know that the message processing went wrong.
OnMessageOption class is used to specify different options about consuming model. Using this class we can specify the level of concurrency (MaxConccurentCalls) and how we want to consume the messages (peek and lock or received and delete). When the AutoComplete flag is set to true, the peek and lock mechanism will be used. This mean that in the moment when the message received event will end, our message will be marked as consumed and removed from Service Bus.
If we want to control this behavior, we need to set this flag to false. Because of this the Complete() method of the BrokeredMessage will not be called automatically. In the message received event we will need to call this method manually. Without calling this method and setting the AutoComplete flag set to false, message will not be removed from Service Bus anymore – Service Bus will expect the success confirmation call.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = false,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
      message.Complete();
    },
  messageOptions);
These option class contains also an exception event that will be triggered in the moment when an error occurs. I recommend to always register to this event. Don’t forget – this event will be called when an exception occurs in the message received event.
This new feature of Service Bus will improve the user experience and our application code.
PS: Don't forget to use a a retry policy when you call the Complete() event.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

Azure AD and AWS Cognito side-by-side

In the last few weeks, I was involved in multiple opportunities on Microsoft Azure and Amazon, where we had to analyse AWS Cognito, Azure AD and other solutions that are available on the market. I decided to consolidate in one post all features and differences that I identified for both of them that we should need to take into account. Take into account that Azure AD is an identity and access management services well integrated with Microsoft stack. In comparison, AWS Cognito is just a user sign-up, sign-in and access control and nothing more. The focus is not on the main features, is more on small things that can make a difference when you want to decide where we want to store and manage our users.  This information might be useful in the future when we need to decide where we want to keep and manage our users.  Feature Azure AD (B2C, B2C) AWS Cognito Access token lifetime Default 1h – the value is configurable 1h – cannot be modified

What to do when you hit the throughput limits of Azure Storage (Blobs)

In this post we will talk about how we can detect when we hit a throughput limit of Azure Storage and what we can do in that moment. Context If we take a look on Scalability Targets of Azure Storage ( https://azure.microsoft.com/en-us/documentation/articles/storage-scalability-targets/ ) we will observe that the limits are prety high. But, based on our business logic we can end up at this limits. If you create a system that is hitted by a high number of device, you can hit easily the total number of requests rate that can be done on a Storage Account. This limits on Azure is 20.000 IOPS (entities or messages per second) where (and this is very important) the size of the request is 1KB. Normally, if you make a load tests where 20.000 clients will hit different blobs storages from the same Azure Storage Account, this limits can be reached. How we can detect this problem? From client, we can detect that this limits was reached based on the HTTP error code that is returned by HTTP