Skip to main content

Event-Driven message consumtion (aka pump model) - Windows Azure Service Bus

The new version of Windows Azure SDK was launched. I’m glad to see that new feature of Service Bus are available. For me, one of the most important features that was added to Service Bus is the one that give us the possibility to consume messages from a topic or queue using an event-driven model – “pump model”.
Until now, we had to have an infinite loop which check if a new message is available. This simple solution was good, but it was very primitive. In a real production application, a new message would trigger an event that would execute a specific code.
To be able to support something like this, the user would need to create an infinite loop, check if a new message is available and trigger a specific event. This infinite pooling creates a lot of problems, especially when something would go wrong – detecting an error and managing it.
while (true)
{
try
{
  if (isNotStopped)
  {
    break;
  }

  BrokeredMessage message = null;
  message = queueClient.Receive();

  if (mesage == null)
  {
    // No message available;
    continue;
  }

  Process(message);
  message.Complete();
}
catch (Exception ex)
{
  ...
}
}
With the new SDK, we don’t need to do this anymore. We can remote the infinite loop from our code and switch to something more elegant.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = true,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
    },
  messageOptions);
As we can see, we need to specify an event action that will be executing when a new message is available. The parameter or this event is the message itself – BrokeredMessage. This message can be process in any way we want.
In the code, beside the message arrived event I think that you notice the exception received event. This event will be called when an error occurs during the message processing. For example, if during the message arrive event an error occurs, the error will trigger the exception received event.
It is important to know that you should not catch errors in the message received event. Doing something like this will not notify the Windows Azure SDK that something went wrong during the message processing. Because of this, in the case we want to consume message in the peek and lock pattern (and not remove messages that could not be processed with success) we will end up with an expected behavior - removing the messages that were not processed with success. This behavior will appear because we catch all the exception in the message arrive event and the SDK will not know that the message processing went wrong.
OnMessageOption class is used to specify different options about consuming model. Using this class we can specify the level of concurrency (MaxConccurentCalls) and how we want to consume the messages (peek and lock or received and delete). When the AutoComplete flag is set to true, the peek and lock mechanism will be used. This mean that in the moment when the message received event will end, our message will be marked as consumed and removed from Service Bus.
If we want to control this behavior, we need to set this flag to false. Because of this the Complete() method of the BrokeredMessage will not be called automatically. In the message received event we will need to call this method manually. Without calling this method and setting the AutoComplete flag set to false, message will not be removed from Service Bus anymore – Service Bus will expect the success confirmation call.
OnMessageOptions messageOptions =new OnMessageOptions()
  {
    AutoComplete = false,
    MaxConcurrentCalls = 7
  }
messageOptions.ExceptionReceived += (sender, e) =>
  {
    ...
  }

QueueClient client = QueueClient.Create(...);
client.OnMessage(
  (message)=>
    {
      ... process message
      message.Complete();
    },
  messageOptions);
These option class contains also an exception event that will be triggered in the moment when an error occurs. I recommend to always register to this event. Don’t forget – this event will be called when an exception occurs in the message received event.
This new feature of Service Bus will improve the user experience and our application code.
PS: Don't forget to use a a retry policy when you call the Complete() event.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine: threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See http://go.microsoft.com/fwlink/?LinkId=260882 for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration: TeamCity .NET 4.51 EF 6.0.2 VS2013 It see

Navigating Cloud Strategy after Azure Central US Region Outage

 Looking back, July 19, 2024, was challenging for customers using Microsoft Azure or Windows machines. Two major outages affected customers using CrowdStrike Falcon or Microsoft Azure computation resources in the Central US. These two outages affected many people and put many businesses on pause for a few hours or even days. The overlap of these two issues was a nightmare for travellers. In addition to blue screens in the airport terminals, they could not get additional information from the airport website, airline personnel, or the support line because they were affected by the outage in the Central US region or the CrowdStrike outage.   But what happened in reality? A faulty CrowdStrike update affected Windows computers globally, from airports and healthcare to small businesses, affecting over 8.5m computers. Even if the Falson Sensor software defect was identified and a fix deployed shortly after, the recovery took longer. In parallel with CrowdStrike, Microsoft provided a too