Skip to main content

Service Bus Topics - How to use it (part 2)

In this post I will continue the post about how we can use Service Bus Topics. In the previews topic we saw how we can create and send messages to a topic. In this post we will see how we can consume messages that were sent to a topic.
Each topic can have from one to n subscribers. A subscriber represents a consumer of messages from the topic. Each subscriber can register to all messages that are added to a topic or only to a part for them. Each subscriber can have a filter set; based on this filter only the messages that validate the filter condition will be received by him. We can imagine that for each subscriber a private queue is create where messages are sent for the specific subscriber.
The base subscriber that is the most simple is the one that subscribe to all messages that are sent to a topic. The type of the filter that is used for this case is called “MatchAll” – and is the default filter. Each subscriber has a unique named and similar with creating the topic, we need to check if the subscriber already exists before creating it.
if (!namespaceManager.SubscriptionExists("myFooTopic", "retriveAllMessagesSubscriber"))
{
namespaceManager.CreateSubscription("myFooTopic", "retriveAllMessagesSubscriber");
}
The first parameter specifies the name of the topic and the second one represent the subscriber name. The naming conventions for the subscriber name are the same as for a topic name. Beside the default behavior, we can specify custom filters that can be applied to messages. For example we can define a subscriber that only accepts messages that a property named “value” set to 10.
This custom filter can be specified in two ways – filters and rule description.
The first way, using filters, is using the SQLFilter rules implementation (SQL92). If you have some knowledge about SQL and where clause, that it will be very easy to use this filter. For the above example we need to create a new instance of SQLFilter and to specify the “value” property to be equal to 10.
namespaceManager.CreateSubscription(
"myFooTopic",
"valueIs10Subscriber",
new SqlFilter("value = 10"));
The other way is by using the RuleDescription class. In comparison with a filter, the rule description can have custom actions that can be executed when the filter conditions is true. For example we have the ability to change a property of the BrokeredMessage that is in the topic. For example when the value is 10 we want to set the property isValid to false. For this we will need to create a RuleDescription where the filter condition to be the same as in the above example, but the action will set the isValid property to false.
RuleDescription ruleDescription = new RuleDescription()
{
Action = new SqlRuleAction("set isValid= false"),
Filter = new SqlFilter("value = 10");
}
namespaceManager.CreateSubscription(
"myFooTopic",
"seccondSubscription",
ruleDescription);
We can have only the action or only the filter set. Also each rule can have a name set. In this way it is easier for us to edit a subscription. When we create or retrieve a subscription an object is returned that permit to add/remove/change any custom rule that we defined. In the same manner we can have more than one rule defined on a subscription.
var subscription = namespaceManager.CreateSubscription(
"myFooTopic",
" thirdSubscription");
subscription.Add(ruleDescription1);
subscription.Add(ruleDescription2);
subscription.Add(ruleDescription3);
The rules will be applied in the order that they were registered.
Maybe you have the following question? Does a message will be added to the subscription if one of the rules is not satisfied?
Yes, by default the TrueFilter is added. Any modification that is made to the message by the rule (using Action) will not be persisted to the topic – it will be persisted only to the subscription if the message is added.
Besides TrueFilter, we have also the FalseFilter. In this case, by default all messages are blocked and only messages that respect the rules are added to the subscription. To be able to change this, the first step after creating a subscription is to remove all the rules. After this step we can add our custom rules.
var subscriptionRules = namespaceManager.GetRules(
"myFooTopic",
"thirdSubscription");
SubscriptionClient subscription = messagingFactory.CreateSubscriptionClient(
"myFooTopic",
"thirdSubscription");

foreach (var rule in subscriptionRules)
{
subscription.RemoveRule(rule.Name);
}
In the action of a subscription, it you want to access a value property from the current message you need to use “[sys]”. In the following example, I set a value of a property as sum of another two properties of the message.
RuleDescription ruleDescription = new RuleDescription()
{
Action = new SqlRuleAction("set sum = [sys].ValueA + [sys].ValueB "),
Name = “sumRule”,
}
Each of this subscription can have some custom property that can be set as expiration time, lock duration, the number of maximum delivery and so on. Using this functionality we can create a pretty complicated flow.
In this moment we can talk about how to receive messages from a subscription. Like Service Bus Queue we can receive message in two different ways: PeekLock and ReceiveAndDelete. For the first one a message is not removed until we don’t call the Complete() method of the message. Using the second one, a message is automatically deleted when is send to the received. Because of this if any error appear, the message will be lost. I recommend the first way if you want to consume messages in a safe way. To create a subscription client that is able to receive messages we need to specify the topic name, credentials and the subscription name. After this we can peak message from the subscription and consume them.
SubscriptionClient subscriptionClient = SubscriptionClient.CreateFromConnectionString(
CloudConfigurationManager.GetSetting("ServiceBusConnectionString"),
"myFooTopic",
"thirdSubscription");

subscriptionClient.Receive();
BrokeredMessage brokeredMessage = subscriptionClient.Receive();

if (message != null)
{
try
{
...
message.Complete();
}
catch (Exception)
{
message.Abandon();
}
}
}
In the above example we consume messages in PeekLock mode. At this level, all the action that we could do with Service Bus Queues can be made also with Service Bus Topics – they have the same base class MessageClientEntity and are using the same message (BrokeredMessage).
At the end we should know that: if we have two clients that we want to receive the same message with the same rules, that we need to specify to subscribers – one for each client. If we have a subscription, that can be consume by 2, 3 or n instances, that we should define only one subscription.
In conclusion receiving message from a Service Bus Topic can be a simple thing but in the same way a complicate one (defining the rules). Depends very much what we want to do. The good part of these rules is that they use the Sql92 standard implementation. Because of this we don’t need to learn how we need to specify each rule.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine: threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See http://go.microsoft.com/fwlink/?LinkId=260882 for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration: TeamCity .NET 4.51 EF 6.0.2 VS2013 It see

Navigating Cloud Strategy after Azure Central US Region Outage

 Looking back, July 19, 2024, was challenging for customers using Microsoft Azure or Windows machines. Two major outages affected customers using CrowdStrike Falcon or Microsoft Azure computation resources in the Central US. These two outages affected many people and put many businesses on pause for a few hours or even days. The overlap of these two issues was a nightmare for travellers. In addition to blue screens in the airport terminals, they could not get additional information from the airport website, airline personnel, or the support line because they were affected by the outage in the Central US region or the CrowdStrike outage.   But what happened in reality? A faulty CrowdStrike update affected Windows computers globally, from airports and healthcare to small businesses, affecting over 8.5m computers. Even if the Falson Sensor software defect was identified and a fix deployed shortly after, the recovery took longer. In parallel with CrowdStrike, Microsoft provided a too