Skip to main content

How to guarantee the order in which the messages are processed inside AWS MQ and Azure Service Bus (FIFO on the consumer)

In the last post, we talk about queues where the ordering of messages are guaranteed (FIFO). Even if we have messaging services where FIFO is guaranteed at queue level, this is not enough to ensure that we consume the messages in the order they were added to the queues.
The challenge for a queue that offers a FIFO is the number of consumers that could be in parallel. To guarantee that the FIFO is fully respected on the consumer side you need to have only ONE consumer or to assume that there are no exceptions or errors that can occur on the consumer side while a message is consumed.
The flavour related to how the FIFO is defined as functionality inside a queue service is between ‘delivery guarantee’ and ‘guaranteeing message delivery in the same order that they were pushed’. It is not the same to have a system that guarantees the FIFO for message delivery vs the order in which the messages are processed. In the end, with a FIFO that guarantee the message delivery, if you have only one consumer that consume the messages you can achieve it (by consuming messages one-by-one). With more than one consumer things are more complicated, but not impossible.

The tradeoff of implementing a full FIFO that guaranteeing message delivery in the same order that they were pushed are:
  1. Scalability – We limit how much the queue can scale
  2. Buffering – The no. of messages that can be consumed
  3. Batching – The no. of messages that can be consumed in a batch
  4. Decoupling – The coupling level between the producer,  queue and the consumer
Use Case
Imagine that you are implementing an e-commerce application where for each order you have process an online payment. The order and transaction state is crucial. Each change of state it is important to be processed in the same order as arrived - FIFO. For these cases, it is crucial to have a message-based system where the order in which the messages are processed is guaranteed.

Session / Group ID
If you remember from the last post we had AWS MQ and Azure Service Bus that guarantee the message delivery as FIFO. Both services are offering service bus infrastructure that can be used to build a strong messaging system.
The session concepts (called Group ID on AWS MQ) are offering us the possibility to specify that only one consumer can consume messages from a specific session. It means that in theory, we reduce the parallelisation level for a group of messages that are part of the same session, allowing us to scale and have the order of the message that is processed guaranteed.
But this is not enough to say that we have the order of the messages that is consumer guaranteed. If you consume messages from the same session using Peek&Lock approach, you need to be careful not to start to request another message from the queue as long as the one that you peek was already released. As long as you have only one message from the current session Peek&Lock, you can say that you have the guarantee of FIFO at the consumer level.
When you use Receive&Delete you are on your own. If an error occurs on the consumer side, there is no way to push the message in the same session as the first message that is available to be consumed. You could handle this using a message priority, but it would be error-prone – luckily, AWS MQ and Azure Services Bus do not support message priority.

How to guarantee the order in which the messages are processed
You should combine Session / Group ID functionality with synchronous consumers. You shall avoid consuming messages in parallel from the same Session / Group ID on the same execution task. You could hit cases when because of an error, the message order would be invalidated.

When you are using Azure Service Bus with the premium tier, you shall all the time specify the SupportOrdering on true. Because the topics are using the pump model and you have multiple partitions, you need a mechanism to enforce that all messages from the same session will land on the same partition. By setting SupportOrdering on true you enforce that all the messages from the same session are landing on the same partition – this needs to be configured only for the premium tier and affects the topic itself, not the subscriptions.

When the queue is only in one region, things are manageable and can be handled from the consumer implementation side. At the moment in time when you want to have your messages replicated in another region things become more complex.
Azure Service Bus supports paired-namespace is an option but is not applicable for our case. The message order is not guaranteed. AWS MQ does not have supported for geo-replication, but for our case, it is the same things, because paired namespace does not support our scenario.
To be able to use geo-replication combined with FIFO and consumer side you will need to have two different instances of AWS MQ or Azure Services Bus that are managed fully by you. It means that at the consumer side you will need to use AWS DocumentDB or Azure CosmosDB to track the messages that were already consumed from a specific queue. In the case of failover, you will reside on this information to be able to continue from the right location.

Remember that it is not the same to have a system that guarantees the FIFO for message delivery vs the order in which the messages are processed.
We need to be careful when we assume that FIFO on the consumer side is guaranteed. There are small flavours of FIFO on each implementation that can mislead us. Be careful with async on the consumer side combined with parallel message processing. As long as you consume one message from the same session/group ID things can be managed pretty easilty.
If you end up in a situation when geo-replication and order in which the messages are processed is required I would challenge you if you really need it and if you are ready to pay the price for it.


Popular posts from this blog

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine:
threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration:

TeamCity.NET 4.51EF 6.0.2VS2013
It seems that there …

Entity Framework (EF) TransactionScope vs Database.BeginTransaction

In today blog post we will talk a little about a new feature that is available on EF6+ related to Transactions.
Until now, when we had to use transaction we used ‘TransactionScope’. It works great and I would say that is something that is now in our blood.
using (var scope = new TransactionScope(TransactionScopeOption.Required)) { using (SqlConnection conn = new SqlConnection("...")) { conn.Open(); SqlCommand sqlCommand = new SqlCommand(); sqlCommand.Connection = conn; sqlCommand.CommandText = ... sqlCommand.ExecuteNonQuery(); ... } scope.Complete(); } Starting with EF6.0 we have a new way to work with transactions. The new approach is based on Database.BeginTransaction(), Database.Rollback(), Database.Commit(). Yes, no more TransactionScope.
In the followi…

GET call of REST API that contains '/'-slash character in the value of a parameter

Let’s assume that we have the following scenario: I have a public HTTP endpoint and I need to post some content using GET command. One of the parameters contains special characters like “\” and “/”. If the endpoint is an ApiController than you may have problems if you encode the parameter using the http encoder.
using (var httpClient = new HttpClient()) { httpClient.BaseAddress = baseUrl; Task<HttpResponseMessage> response = httpClient.GetAsync(string.Format("api/foo/{0}", "qwert/qwerqwer"))); response.Wait(); response.Result.EnsureSuccessStatusCode(); } One possible solution would be to encode the query parameter using UrlTokenEncode method of HttpServerUtility class and GetBytes method ofUTF8. In this way you would get the array of bytes of the parameter and encode them as a url token.
The following code show to you how you could write the encode and decode methods.