Skip to main content

Service Bus Topics - How to use it (part 2)

In this post I will continue the post about how we can use Service Bus Topics. In the previews topic we saw how we can create and send messages to a topic. In this post we will see how we can consume messages that were sent to a topic.
Each topic can have from one to n subscribers. A subscriber represents a consumer of messages from the topic. Each subscriber can register to all messages that are added to a topic or only to a part for them. Each subscriber can have a filter set; based on this filter only the messages that validate the filter condition will be received by him. We can imagine that for each subscriber a private queue is create where messages are sent for the specific subscriber.
The base subscriber that is the most simple is the one that subscribe to all messages that are sent to a topic. The type of the filter that is used for this case is called “MatchAll” – and is the default filter. Each subscriber has a unique named and similar with creating the topic, we need to check if the subscriber already exists before creating it.
if (!namespaceManager.SubscriptionExists("myFooTopic", "retriveAllMessagesSubscriber"))
{
namespaceManager.CreateSubscription("myFooTopic", "retriveAllMessagesSubscriber");
}
The first parameter specifies the name of the topic and the second one represent the subscriber name. The naming conventions for the subscriber name are the same as for a topic name. Beside the default behavior, we can specify custom filters that can be applied to messages. For example we can define a subscriber that only accepts messages that a property named “value” set to 10.
This custom filter can be specified in two ways – filters and rule description.
The first way, using filters, is using the SQLFilter rules implementation (SQL92). If you have some knowledge about SQL and where clause, that it will be very easy to use this filter. For the above example we need to create a new instance of SQLFilter and to specify the “value” property to be equal to 10.
namespaceManager.CreateSubscription(
"myFooTopic",
"valueIs10Subscriber",
new SqlFilter("value = 10"));
The other way is by using the RuleDescription class. In comparison with a filter, the rule description can have custom actions that can be executed when the filter conditions is true. For example we have the ability to change a property of the BrokeredMessage that is in the topic. For example when the value is 10 we want to set the property isValid to false. For this we will need to create a RuleDescription where the filter condition to be the same as in the above example, but the action will set the isValid property to false.
RuleDescription ruleDescription = new RuleDescription()
{
Action = new SqlRuleAction("set isValid= false"),
Filter = new SqlFilter("value = 10");
}
namespaceManager.CreateSubscription(
"myFooTopic",
"seccondSubscription",
ruleDescription);
We can have only the action or only the filter set. Also each rule can have a name set. In this way it is easier for us to edit a subscription. When we create or retrieve a subscription an object is returned that permit to add/remove/change any custom rule that we defined. In the same manner we can have more than one rule defined on a subscription.
var subscription = namespaceManager.CreateSubscription(
"myFooTopic",
" thirdSubscription");
subscription.Add(ruleDescription1);
subscription.Add(ruleDescription2);
subscription.Add(ruleDescription3);
The rules will be applied in the order that they were registered.
Maybe you have the following question? Does a message will be added to the subscription if one of the rules is not satisfied?
Yes, by default the TrueFilter is added. Any modification that is made to the message by the rule (using Action) will not be persisted to the topic – it will be persisted only to the subscription if the message is added.
Besides TrueFilter, we have also the FalseFilter. In this case, by default all messages are blocked and only messages that respect the rules are added to the subscription. To be able to change this, the first step after creating a subscription is to remove all the rules. After this step we can add our custom rules.
var subscriptionRules = namespaceManager.GetRules(
"myFooTopic",
"thirdSubscription");
SubscriptionClient subscription = messagingFactory.CreateSubscriptionClient(
"myFooTopic",
"thirdSubscription");

foreach (var rule in subscriptionRules)
{
subscription.RemoveRule(rule.Name);
}
In the action of a subscription, it you want to access a value property from the current message you need to use “[sys]”. In the following example, I set a value of a property as sum of another two properties of the message.
RuleDescription ruleDescription = new RuleDescription()
{
Action = new SqlRuleAction("set sum = [sys].ValueA + [sys].ValueB "),
Name = “sumRule”,
}
Each of this subscription can have some custom property that can be set as expiration time, lock duration, the number of maximum delivery and so on. Using this functionality we can create a pretty complicated flow.
In this moment we can talk about how to receive messages from a subscription. Like Service Bus Queue we can receive message in two different ways: PeekLock and ReceiveAndDelete. For the first one a message is not removed until we don’t call the Complete() method of the message. Using the second one, a message is automatically deleted when is send to the received. Because of this if any error appear, the message will be lost. I recommend the first way if you want to consume messages in a safe way. To create a subscription client that is able to receive messages we need to specify the topic name, credentials and the subscription name. After this we can peak message from the subscription and consume them.
SubscriptionClient subscriptionClient = SubscriptionClient.CreateFromConnectionString(
CloudConfigurationManager.GetSetting("ServiceBusConnectionString"),
"myFooTopic",
"thirdSubscription");

subscriptionClient.Receive();
BrokeredMessage brokeredMessage = subscriptionClient.Receive();

if (message != null)
{
try
{
...
message.Complete();
}
catch (Exception)
{
message.Abandon();
}
}
}
In the above example we consume messages in PeekLock mode. At this level, all the action that we could do with Service Bus Queues can be made also with Service Bus Topics – they have the same base class MessageClientEntity and are using the same message (BrokeredMessage).
At the end we should know that: if we have two clients that we want to receive the same message with the same rules, that we need to specify to subscribers – one for each client. If we have a subscription, that can be consume by 2, 3 or n instances, that we should define only one subscription.
In conclusion receiving message from a Service Bus Topic can be a simple thing but in the same way a complicate one (defining the rules). Depends very much what we want to do. The good part of these rules is that they use the Sql92 standard implementation. Because of this we don’t need to learn how we need to specify each rule.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(...

How to audit an Azure Cosmos DB

In this post, we will talk about how we can audit an Azure Cosmos DB database. Before jumping into the problem let us define the business requirement: As an Administrator I want to be able to audit all changes that were done to specific collection inside my Azure Cosmos DB. The requirement is simple, but can be a little tricky to implement fully. First of all when you are using Azure Cosmos DB or any other storage solution there are 99% odds that you’ll have more than one system that writes data to it. This means that you have or not have control on the systems that are doing any create/update/delete operations. Solution 1: Diagnostic Logs Cosmos DB allows us activate diagnostics logs and stream the output a storage account for achieving to other systems like Event Hub or Log Analytics. This would allow us to have information related to who, when, what, response code and how the access operation to our Cosmos DB was done. Beside this there is a field that specifies what was th...

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills)

Cloud Myths: Cloud is Cheaper (Pill 1 of 5 / Cloud Pills) The idea that moving to the cloud reduces the costs is a common misconception. The cloud infrastructure provides flexibility, scalability, and better CAPEX, but it does not guarantee lower costs without proper optimisation and management of the cloud services and infrastructure. Idle and unused resources, overprovisioning, oversize databases, and unnecessary data transfer can increase running costs. The regional pricing mode, multi-cloud complexity, and cost variety add extra complexity to the cost function. Cloud adoption without a cost governance strategy can result in unexpected expenses. Improper usage, combined with a pay-as-you-go model, can result in a nightmare for business stakeholders who cannot track and manage the monthly costs. Cloud-native services such as AI services, managed databases, and analytics platforms are powerful, provide out-of-the-shelve capabilities, and increase business agility and innovation. H...