Receiving messages from Azure Service Bus in .Net Core

For some time now we can observe how new .Net Core framework is growing and become more mature. Version 2.0 was released in August 2017 and it is more capable and supports more platforms then it’s previous releases. But the biggest feature of this brand new Microsoft framework is its performance and it’s ability to handle http requests much faster then it’s bigger brother.

However introduction of new creation is only the beginning as more and more packages are being ported or rewritten to new, lighter framework. This is a natural opportunity for developers to implement some additional changes,  refactor existing code or maybe slightly simplify existing API. It also means that porting existing solutions to .Net Core might not be straightforward and in this article I’ll check how receiving messages from Service Bus looks like.

First things first

In order to start receiving messages, we need to have:

After all of this, my topic looks like this:

Receiving messages

To demonstrate how to receive messages I created a console application in .Net Core 2.0 framework. Then I installed Microsoft.Azure.ServiceBus (v2.0) nuget package and also Newtonsoft.Json to parse messages body. My ProductRatingUpdateMessage message class looks like this:

    public class ProductRatingUpdateMessage
    {
        public int ProductId { get; set; }

        public int SellerId { get; set; }

        public int RatingSum { get; set; }

        public int RatingCount { get; set; }
    }

All of the logic is inside MessageReceiver class:

    public class MessageReceiver
    {
        private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[privateKey]";

        public void Receive()
        {
            var subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, "productRatingUpdates", "sampleSubscription");

            try
            {
                subscriptionClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        var messageJson = Encoding.UTF8.GetString(message.Body);
                        var updateMessage = JsonConvert.DeserializeObject<ProductRatingUpdateMessage>(messageJson);

                        Console.WriteLine($"Received message with productId: {updateMessage.ProductId}");

                        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
                    },
                    new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
                    { MaxConcurrentCalls = 1, AutoComplete = false });
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
        }
    }

 

Notice that creating a SubscriptionClient is very simple, it takes just one line and handles all the work. Next up is RegisterMessageHandler which handles messages one by one and completes them in the end. If something goes wrong message will not be completed and after lock on it will be removed – it will be available for processing again. If you set AutoComplete option to true, then message will be automatically completed after returning from User Callback. This message pump approach won’t let you handle incoming messages in batches, but parameter MaxConcurrentCalls can be set to handle multiple messages in parallel.

Is it ready?

Microsoft.Azure.ServiceBus nuget package in version 2.0 offers most desirable functionality. It should be enough for most cases but it also has huge gaps:

  • Cannot receive messages in batches
  • Cannot manage entities – create topics, queues and subscriptions
  • Cannot check if queue, topic or subscription exist

Especially entities management features are important. It is reasonable that when scaling-up a service that read from topic, it creates it’s own subscription and handles it on it’s own. Currently developer needs to go to Azure Portal to create subscriptions for each micro-service manually.

Update! 

Version 3.1 supports entities management – have a look at my post about it: Managing ServiceBus queues, topics and subscriptions in .Net Core

If you liked code posted here, you can find it (and a lot more) in my github blog repo: https://github.com/mikuam/Blog.

 

 

 

12 thoughts on “Receiving messages from Azure Service Bus in .Net Core

  1. Eto

    Hi,

    How would I go about this if I just want to receive one message only? I don’t really want the message handler to be reading in all the messages in the subscription. I just want it to receive and pop off the first message and end.

    Thanks,

    Reply
    1. mbialecki Post author

      Hey Eto,
      This is a tough question and I’m not sure if Azure Service Bus package is build to work that way in .Net Core. However, with a few tests I was able to come with such code, that receives only one message, but also log an exception.

      public void ReceiveOne()
      {
      var queueClient = new QueueClient(ServiceBusConnectionString, “go_testing”);

      queueClient.RegisterMessageHandler(
      async (message, token) =>
      {
      var messageBody = Encoding.UTF8.GetString(message.Body);

      Console.WriteLine($”Received: {messageBody}, time: {DateTime.Now}”);

      await queueClient.CompleteAsync(message.SystemProperties.LockToken);

      await queueClient.CloseAsync();
      },
      new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
      { MaxConcurrentCalls = 1, AutoComplete = false });
      }

      Let me know if you find a better way!

      Reply
    1. Michał Białecki Post author

      Hi Reinhard!
      Your right, it is possible to manage queues and topic in an easy way. However, it is possible only from version 3.1 of Microsoft.Azure.ServiceBus package. You can have a look at my post about it: https://www.michalbialecki.com/2018/10/22/managing-servicebus-queues-topics-and-subscriptions-in-net-core/

      Microsoft.Azure.Management.ServiceBus package uses a different approach and it is much more powerful, but can manage topic and queues as well.

      Reply
  2. Jonathan Estrada

    Hi Michal, Is possible receiving the messages in a web application like MVC, instead of console application?

    Thanks.

    Reply
    1. Michał Białecki Post author

      Hi Jonathan,
      Service Bus messages can be read in any type of a project, you just need to reference NuGet package and you’re good to go 🙂

      Reply
      1. Sam Donald

        Considering webapp may not be available all the time (warm), I would probably use Azure Functions to actually process the messages instead of the webapp directly.

        Reply
        1. Michał Białecki Post author

          I agree.
          Keeping application always running can bring unnecessary costs when hosting in Azure.
          Back in 2018 when I wrote this post, Azure Functions wasn’t something common in projects that I worked on, but now it is a standard.

          Reply
  3. Ivan

    Hi Michal,

    I wonder if you could spare a few moments of your time to help us out.

    We are getting the following exception when executing a long-running task in our service bus subscriber.

    “Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.”

    The long-running task does several things including reading 4.5M records from the database, building an Excel document (using OpenXml) with multiple sheets (~1M records per sheet), updating a few entities in the database, etc. It’s all wrapped inside one method of a service, and that’s the only method exposed to the subscriber.

    Since this task takes longer than the message is locked for, the message is released to the queue. The subscriber picks it up and starts working on it again.

    We’d like to successfully renew the message lock before it is released back to the queue. We’ve added logic using a timer to execute “await client.RenewLockAsync(message.SystemProperties.LockToken)” before the message is released based on the value of “message.SystemProperties.LockedUntilUtc”. This seems to work correctly and effectively if we mimic the task with a “Task.Delay(TimeSpan.FromHours(2))”, for example, but with the actual call “await internalService.ExportReport()”, the renew doesn’t seem to be effective. The message lock release seems to happen at the 5 minute mark always. We’ve registered our client’s “OperationTimeout” and “ServisceBusConnection.OperationTimeout” with “TimeSpan.FromHours(10)”, but that doesn’t seem to have an effect on the message lock timeout.

    Our core libraries target “netstandard2.0”, our application targets “netcoreapp2.2” and we’re using “Microsoft.Azure.ServiceBus v4.0.0”.

    Can you give us some suggestions on how better approach this case, or any long-running task in general?

    Reply
    1. Michał Białecki Post author

      Hi Ivan,
      This sounds like a looong running job. I haven’t used `RenewLockAsync`, but I can advise you to rather not use it in this case.
      In general reading message from Service Bus and processing it, should be quick. I had the same problem when processing the message took too long and I finally did a two-step process. The first part is reading the message and saving it. The second part is running the long job. The nice thing about Service Bus is that it has a sort-of retry mechanism when the message goes back to the queue, so in order to have a similar thing you would need to write a background-job, that would go through saved messages and check which one was processed.

      This is what I would do, but there are multiple solutions. As Service Bus is great with retry mechanism I am looking more into fire-and-forget model when writing new solutions. And I’m shifting from Service Bus to Event Grid.

      Hope that helps you,
      Cheers!

      Reply
  4. Nirmit Garg

    I want to wrap the subscriber in a class and return topic message to the calling class instead of printing on console. Can you post one example as I am facing challenges and all examples I have found are for console app only.

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *