Tag Archives: subscription

Managing ServiceBus queues, topics and subscriptions in .Net Core

From version 3.1 of Microsoft.Azure.ServiceBus it is finally possible to manage queues, topics and subscriptions in .Net Core. Let’s have a look at how we can use it in real life scenarios.

Previously we would use sample code for getting a queue:

public IQueueClient GetQueueClient(string _serviceBusConnectionString, string _queueName)
{
    var queueClient = new QueueClient(_serviceBusConnectionString, _queueName);
    return queueClient;
}

Using ManagementClient we can write much better code.

public async Task<IQueueClient> GetOrCreateQueue(string _serviceBusConnectionString, string _queueName)
{
    var managementClient = new ManagementClient(_serviceBusConnectionString);
    if (!(await managementClient.QueueExistsAsync(_queueName)))
    {
        await managementClient.CreateQueueAsync(new QueueDescription(_queueName));
    }

    var queueClient = new QueueClient(_serviceBusConnectionString, _queueName);
    return queueClient;
}

Now before getting a queue, we are checking if a queue exists and if not, we are creating it. So when executing this code:

manager.GetOrCreateQueue(configuration["ServiceBusConnectionString"], "createTest").GetAwaiter().GetResult();

We will get a queue that we can use.

Customizing your ServiceBus subscription

It is pretty easy to create a topic subscription or a queue, but SubscriptionDescription object offers a lot more then just that. This is a simple code that creates a subscription:

public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
{
    var managementClient = new ManagementClient(serviceBusConnectionString);
    if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
    {
        await managementClient.CreateSubscriptionAsync(new SubscriptionDescription(topicPath, subscriptionName));
    }

    var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
    return subscriptionClient;
}

Let’s have a look at a few most important properties:

TopicPath The path of the topic that this subscription description belongs to
Name Name of the subscription
DefaultMessageTimeToLive This is the duration after which the message expires, starting from when the message is sent to the Service Bus. After that time, the message will be removed from a subscription
EnableDeadLetteringOnMessageExpiration Support for dead-letter queue. When you enable it, messages will come here instead of being removed from the main queue
EnableBatchedOperations It’s a good idea to set it to true, no matter if the reader supports batch operations or not. Doing things in batches is usually faster
LockDuration This is the duration for which message can be locked for processing
MaxDeliveryCount Maximum count of message returning to the subscription after failure processing. After that count message will be removed from the subscription

Let’s have a look what we can fill in in a real project.

public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
{
    var managementClient = new ManagementClient(serviceBusConnectionString);
    if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
    {
        await managementClient.CreateSubscriptionAsync(
            new SubscriptionDescription(topicPath, subscriptionName)
            {
                EnableBatchedOperations = true,
                AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                EnableDeadLetteringOnMessageExpiration = true,
                DefaultMessageTimeToLive = System.TimeSpan.FromDays(100),
                MaxDeliveryCount = 100,
                LockDuration = System.TimeSpan.FromMinutes(5)
            });
    }

    var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
    return subscriptionClient;
}

AutoDeleteOnIdle – subscription will be removed from the topic after 100 days idle – that is very unlikely. DefaultMessageTimeToLive and EnableDeadLetteringOnMessageExpiration – messages will be kept in the queue for very long – 100 days, then they will be sent to a dead letter queue. MaxDeliveryCount and LockDuration – message will be processed up to 100 times and for a maximum of 5 minutes.

We can do one more thing. When testing a project while development locally it’s ideal to work with real data. In the real case, we would probably have different Service Bus namespace and separate connection string for every environment. There is, however, a trick to use DEV data locally – just create your testing subscription! This is how it can look like:

    public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
    {
        var managementClient = new ManagementClient(serviceBusConnectionString);
#if DEBUG
        if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName + "_MikTesting")))
        {
            await managementClient.CreateSubscriptionAsync(
                new SubscriptionDescription(topicPath, subscriptionName + "_MikTesting")
                {
                    EnableBatchedOperations = true,
                    AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                    EnableDeadLetteringOnMessageExpiration = false,
                    DefaultMessageTimeToLive = System.TimeSpan.FromDays(2),
                    MaxDeliveryCount = 5,
                    LockDuration = System.TimeSpan.FromMinutes(5)
                });
        }
#else       
        if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
        {
            await managementClient.CreateSubscriptionAsync(
            new SubscriptionDescription(topicPath, subscriptionName)
            {
                EnableBatchedOperations = true,
                AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                EnableDeadLetteringOnMessageExpiration = true,
                DefaultMessageTimeToLive = System.TimeSpan.FromDays(100),
                MaxDeliveryCount = 100,
                LockDuration = System.TimeSpan.FromMinutes(5)
            });
        }
#endif
        var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
        return subscriptionClient;
    }

Testing subscription will have it’s own name, it will still be there up to 100 days of idle, but messages will be kept only for 2 days and they will not end up in dead letter queue. MaxDeliveryCount is only 5, cause if something goes wrong, we will end up having 5 the same errors in logs instead of 100 and this is much more likely to happen when testing locally.

Hope you found it useful, every code posted here is in my GitHub repository: https://github.com/mikuam/Blog

If you’re interested in more posts about Service Bus in .Net Core, have a look at:

Azure Service Bus – introduction

Azure Service Bus is a Microsoft implementation of a messaging system, that works seamlessly in the cloud and does not require to set up a server of any kind. Messaging is a good alternative for communication between micro-services. Let’s compare the two.

REST communication

service-bus-communication

  • contract constrains may be a risk
  • synchronous model be default
  • load balancing is harder

Messaging communication

REST-communication

  • asynchronous
  • easy for load balancing, can have multiple competing readers
  • fire and forget

Using messages for communication is much more elastic in planning architecture. You can add move receivers and more senders at will. It’s a fire and forget model, when you do not care when message will be processed and you don’t need to worry how it will reach the receiver – your messaging system will care about it for you.

Azure Service Bus

To explore Service Bus options, go to your Azure portal and search for Service Bus.

service-bus-search

 

Go inside and you’ll need to create of choose a namespace. Namespaces can be useful when you would like to group multiple queries or topics for example by different contexts. It’s much easier to browse through them when you have for example all orders related queues in a orders namespace. Inside the namespace you’ll see list of your queues and topics.

topics-and-queues

 

Queues

Queue is a FIFO (First In, First out) is a messages delivery to one or more competing consumers.  Each message is received and processed by only one consumer and messaging system centrally manage this process,  so no deadlock will occur.

queue

It is expected that messages are processed in the same order which they arrived. Messages however do not have to be processed right away when they arrived. They can wait safely in the queue for first free consumer. With possibility of having multiple consumers is in very easy to balance load and add new consumers in the infrastructure, so messages will be processed faster. This is of course with an assumption that messages can be processed independently and do not relate to one another. Another key feature is that work of consumers do not affect work done by publisher. During heavy load or high usage of the system messages will be stored in the queue and consumers will not be overloaded with multiple call as it is in REST services, but continue to work and process messages with the same speed as usual.

In Azure Service Bus queues size of a queue can be huge, even up to 16GB. Message size limit is rather small – maximum of 256KB. However sessions support allows creation of unlimited-size sequences of related messages.

Topics

topic

 

Comparing to queue where only one consumer is processing a message, in topics messages are cloned in to subscriptions, which contains the same messages. This represents one-to-many form of communication in a publish/subscribe pattern. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages can be filtered by their attributes, where published can for example set recipient who should receive that message. Consumers instead of connecting directly to topic, connects to a subscriptions, that can be understood as a virtual queue. The same way as queues, subscriptions can have multiple competing consumers and this gives even more possibilities to plan services architecture.

An example usage a topic can be for example notifications about product stock status changed. Topic where messages are sent can have multiple subscriptions each filtering messages according to their needs. One may need all updates but other may be interested only in digital products like games. With SqlFilterExpression class filter rule can look like this:

ProductType = ‘Digital’ AND QuantityForSale > 0

Queues, topics and subscriptions gives a lot possibilities and are easier to scale and adapt in the future. Comparing to REST services they are slightly more complicated to implement, needs centralized system but offers much more in return.