Skip to main content

Event-Driven message consumtion (aka pump model) - Windows Azure Service Bus

The new version of Windows Azure SDK was launched. I’m glad to see that new feature of Service Bus are available. For me, one of the most important features that was added to Service Bus is the one that give us the possibility to consume messages from a topic or queue using an event-driven model – “pump model”.
Until now, we had to have an infinite loop which check if a new message is available. This simple solution was good, but it was very primitive. In a real production application, a new message would trigger an event that would execute a specific code.
To be able to support something like this, the user would need to create an infinite loop, check if a new message is available and trigger a specific event. This infinite pooling creates a lot of problems, especially when something would go wrong – detecting an error and managing it.
while (true)
{
try
{
  if (isNotStopped)
  {
    break;
  }

  BrokeredMessage message = null;
  message = queueClient.Receive();

  if (mesage == null)
  {
    // No message available;
    continue;
  }

  Process(message);
  message.Complete();
}
catch (Exception ex)
{
  ...
}
}
With the new SDK, we don’t need to do this anymore. We can remote the infinite loop from our code and switch to something more elegant.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = true,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
    },
  messageOptions);
As we can see, we need to specify an event action that will be executing when a new message is available. The parameter or this event is the message itself – BrokeredMessage. This message can be process in any way we want.
In the code, beside the message arrived event I think that you notice the exception received event. This event will be called when an error occurs during the message processing. For example, if during the message arrive event an error occurs, the error will trigger the exception received event.
It is important to know that you should not catch errors in the message received event. Doing something like this will not notify the Windows Azure SDK that something went wrong during the message processing. Because of this, in the case we want to consume message in the peek and lock pattern (and not remove messages that could not be processed with success) we will end up with an expected behavior - removing the messages that were not processed with success. This behavior will appear because we catch all the exception in the message arrive event and the SDK will not know that the message processing went wrong.
OnMessageOption class is used to specify different options about consuming model. Using this class we can specify the level of concurrency (MaxConccurentCalls) and how we want to consume the messages (peek and lock or received and delete). When the AutoComplete flag is set to true, the peek and lock mechanism will be used. This mean that in the moment when the message received event will end, our message will be marked as consumed and removed from Service Bus.
If we want to control this behavior, we need to set this flag to false. Because of this the Complete() method of the BrokeredMessage will not be called automatically. In the message received event we will need to call this method manually. Without calling this method and setting the AutoComplete flag set to false, message will not be removed from Service Bus anymore – Service Bus will expect the success confirmation call.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = false,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
      message.Complete();
    },
  messageOptions);
These option class contains also an exception event that will be triggered in the moment when an error occurs. I recommend to always register to this event. Don’t forget – this event will be called when an exception occurs in the message received event.
This new feature of Service Bus will improve the user experience and our application code.
PS: Don't forget to use a a retry policy when you call the Complete() event.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(...

How to audit an Azure Cosmos DB

In this post, we will talk about how we can audit an Azure Cosmos DB database. Before jumping into the problem let us define the business requirement: As an Administrator I want to be able to audit all changes that were done to specific collection inside my Azure Cosmos DB. The requirement is simple, but can be a little tricky to implement fully. First of all when you are using Azure Cosmos DB or any other storage solution there are 99% odds that you’ll have more than one system that writes data to it. This means that you have or not have control on the systems that are doing any create/update/delete operations. Solution 1: Diagnostic Logs Cosmos DB allows us activate diagnostics logs and stream the output a storage account for achieving to other systems like Event Hub or Log Analytics. This would allow us to have information related to who, when, what, response code and how the access operation to our Cosmos DB was done. Beside this there is a field that specifies what was th...

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills)

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills) The idea that moving to the cloud reduces the costs is a common misconception. The cloud infrastructure provides flexibility, scalability, and better CAPEX, but it does not guarantee lower costs without proper optimisation and management of the cloud services and infrastructure. Idle and unused resources, overprovisioning, oversize databases, and unnecessary data transfer can increase running costs. The regional pricing mode, multi-cloud complexity, and cost variety add extra complexity to the cost function. Cloud adoption without a cost governance strategy can result in unexpected expenses. Improper usage, combined with a pay-as-you-go model, can result in a nightmare for business stakeholders who cannot track and manage the monthly costs. Cloud-native services such as AI services, managed databases, and analytics platforms are powerful, provide out-of-the-shelve capabilities, and increase business agility and innovation. H...