Tag Archives: azure service bus

Buffered sending Service Bus messages

Recently I run into a challenge. I’ working on a micro-service that receives and processes messages one by one. How to send Service Bus messages, not instantly, but when they pile up? The reason for cause it is expensive when it comes to performance. Let’s send messages after 10 piles up or after every 20 seconds.

It is not an obvious task, because Microsoft’s implementation does not support it. However, simple buffering can be done like this:

    public class SimpleBufferMessagesService
    {
        private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[key]";

        private static readonly List<Message> _messages = new List<Message>();

        private static DateTime _lastMessageSent = DateTime.Now;

        private readonly TopicClient _topicClient;

        public SimpleBufferMessagesService()
        {
            _topicClient = new TopicClient(ServiceBusConnectionString, "accountTransferUpdates");
        }

        public async Task AddMessage(string message)
        {
            _messages.Add(new Message(Encoding.UTF8.GetBytes(message)));

            if (_messages.Count >= 10
                || DateTime.Now - _lastMessageSent > TimeSpan.FromSeconds(20))
            {
                await SendMessages(_messages);
                _messages.Clear();
                _lastMessageSent = DateTime.Now;
            }
        }

        private async Task SendMessages(List<Message> messages)
        {
            await _topicClient.SendAsync(messages);
        }
    }

This solution works quite well. Notice that I used static fields, so they would be preserved between requests. On every request instance of SimpleBufferMessagesService will be created anew.

There are a few problems with it:

  • it is not thread-safe. Two instances of SimpleBufferMessagesService can use the same _messages field and mess with it. It is a rather huge risk because sending Service Bus message takes some time
  • some messages can wait a long time to be sent. When messages stay in the queue and 20 seconds pass, there has to be another request to send them. This is a threat of losing messages when service will be restarted. We shouldn’t keep messages longer then we need to

Having that in mind we need to think of something, that executes every 20 seconds, in intervals like… like… like Timer!

Timer solution

Timer needs to be registered in Startup class, I did that in the end of Configure method.

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        // many things here

        var timeoutInMiliseconds = 20000;
        new Timer(s => { ServiceBusTimerCallback(); }, null, 0, timeoutInMiliseconds);
    }

    private static void ServiceBusTimerCallback()
    {
        var bufferService = new TimerBufferMessagesService();
        bufferService.SendMessages();
    }

And class that sends messages can be modified like that:

    public class TimerBufferMessagesService
    {
        private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[key]";

        private static readonly ICollection<Message> _messages = new List<Message>();

        private readonly TopicClient _topicClient;

        public TimerBufferMessagesService()
        {
            _topicClient = new TopicClient(ServiceBusConnectionString, "accountTransferUpdates");
        }

        public void AddMessage(string message)
        {
            lock (((ICollection) _messages).SyncRoot)
            {
                _messages.Add(new Message(Encoding.UTF8.GetBytes(message)));
            }
        }

        public void SendMessages()
        {
            if (_messages.Count == 0)
            {
                return;
            }

            List<Message> localMessages;
            lock (((ICollection)_messages).SyncRoot)
            {
                localMessages = new List<Message>(_messages);
                _messages.Clear();
            }

            Task.Run(async () => { await _topicClient.SendAsync(localMessages); });
        }
    }

This implementation is much better. It will run every 20 seconds and it sends messages if there is any. The SendMessages method will be called by one instance and AddMessage will be called by many instances but is written in a safe way.

It was perfect till the moment I realized it wasn’t working.

The thing is that sooner or later timer is destroyed by a garbage collector. Even when I tried to save the reference to timer or use GC.KeepAlive(timer), it always got cleared.

 

 

 

But it can be done right

According to StackOverflow question: https://stackoverflow.com/questions/3635852/system-threading-timer-not-firing-after-some-time/ we can use ThreadPool.RegisterWaitForSingleObject.

That method can be used instead of timer:

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        // many lines here

        const int timeoutInMiliseconds = 20000;
        var allTasksWaitHandle = new AutoResetEvent(true);

        ThreadPool.RegisterWaitForSingleObject(
            allTasksWaitHandle,
            (s, b) =>
            {
                ServiceBusTimerCallback();
            },
            null,
            timeoutInMiliseconds,
            false);
    }

    private static void ServiceBusTimerCallback()
    {
        var bufferService = new TimerBufferMessagesService();
        bufferService.SendMessages();
    }

The result is the same, but it will work constantly.

Full code can be found in my github repository: https://github.com/mikuam/Blog/

If you’re more interested in Service Bus, have a look at my post: https://www.michalbialecki.com/2017/12/21/sending-a-azure-service-bus-message-in-asp-net-core/

Or maybe this one: https://www.michalbialecki.com/2018/04/19/how-to-send-many-requests-in-parallel-in-asp-net-core/

Enjoy!

Receiving only one message from Azure Service Bus

Some time ago I got a question from Eto: “How would I go about this if I just want to receive one message only?” And I started thinking… is it possible in .Net Core?

I used the newest Microsoft.Azure.ServiceBus package, that is dedicated for .Net Core, but there is no method to receive only one message. So I used regular RegisterMessageHandler with a twist:

    public void ReceiveOne()
    {
        var queueClient = new QueueClient(ServiceBusConnectionString, "go_testing");

        queueClient.RegisterMessageHandler(
            async (message, token) =>
            {
                var messageBody = Encoding.UTF8.GetString(message.Body);
                Console.WriteLine($"Received: {messageBody}, time: {DateTime.Now}");
                await queueClient.CompleteAsync(message.SystemProperties.LockToken);

                await queueClient.CloseAsync();
            },
            new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
            { MaxConcurrentCalls = 1, AutoComplete = false });
    }

As you can see it is a standard approach, but after successfully processed message, I close queueClient. This works and it receives only one message, but it also gives an error.

I wasn’t fully satisfied with the solution, so I asked a question on StackOverflow: https://stackoverflow.com/questions/50438466/azure-service-bus-in-net-core-how-to-receive-only-one-message

After a few hours, I got an answer, that it is possible and I should just… use different package!

Using the old package

So far I didn’t manage to use old package in .NetCore project. So in order to install package WindowsAzure.ServiceBus, you need to have project referencing full framework.

And here is the code:

    public async Task ReceiveOne()
    {
        var queueClient = QueueClient.CreateFromConnectionString(ServiceBusConnectionString, "go_testing", ReceiveMode.PeekLock);

        var message = await queueClient.ReceiveAsync();
        Console.WriteLine($"Received: {message.GetBody<string>()}, time: {DateTime.Now}");

        await message.CompleteAsync();
    }

So that’s it and it works, but sadly not in .Net Core. However, I’ll keep this post up to date when such thing will be possible.

Sending Service Bus message in Go

Go or GoLang is an open source programming language. It’s a server-side C like language created by Google. I won’t take much or your time to introduce you to the language, but here is a short summary why it’s worth trying.

 

  • Go is open-source, but backed up by Google and used by big companies (Google, Dropbox, Docker, etc.)
  • It is something we know, it resembles C++ and it is easy to read
  • It’s fast, it compiles directly to machine language, no virtual machine in the middle
  • It’s a modern language, with packages instead of classes
  • Unlike many older languages, Go is designed to work in parallel

The easiest way

I found a package on github: https://github.com/michaelbironneau/asbclient. I needed to modify it a bit to work properly, so I forked that into my repo: https://github.com/mikuam/asbclient.

I found an existing sample and provided my credentials.

package main

import (
	"fmt"
	"log"

	"github.com/michaelbironneau/asbclient"
)

func main() {

	i := 0
	log.Printf("Send: %d", i)

	namespace := "bialecki"
	keyname := "RootManageSharedAccessKey"
	keyvalue := "[SharedAccessKeyValue]"

	client := asbclient.New(asbclient.Topic, namespace, keyname, keyvalue)

	err := client.Send("go_testing", &asbclient.Message{
		Body: []byte(fmt.Sprintf("message %d", i)),
	})

	if err != nil {
		log.Printf("Send error: %s", err)
	} else {
		log.Printf("Sent: %d", i)
	}
}

And result can be seen very fast:

Receiving Service Bus message is also trivial with this package and takes only a few lines of code. It looks like this:

package main

import (
	"log"

	"github.com/mikuam/asbclient"
)

func main() {

	namespace := "bialecki"
	keyname := "RootManageSharedAccessKey"
	keyvalue := "[SharedAccessKeyValue]"

	client := asbclient.New(asbclient.Queue, namespace, keyname, keyvalue)
	log.Printf("Peeking...")

	for {
		msg, err := client.PeekLockMessage("go_testing", 30)

		if err != nil {
			log.Printf("Peek error: %s", err)
		} else {
			log.Printf("Peeked message: '%s'", string(msg.Body))
			err = client.DeleteMessage(msg)
			if err != nil {
				log.Printf("Delete error: %s", err)
			}
		}
	}
}

It works, simple as that. So…

How fast is it?

Let’s say I need to send 1000 messages and receive them. As asbclient package supports only sending messages one by one, I will implement the same logic in .Net Core app. Sending part can look like this:

    public async Task Send1000()
    {
        var queueClient = new QueueClient(ServiceBusConnectionString, "go_testing");
        for (int i = 0; i < 1000; i++)
        {
            await queueClient.SendAsync(new Message(Encoding.UTF8.GetBytes("Message number " + i)));
        }
    }

And receiving part:

    public void ReceiveAll()
    {
        var queueClient = new QueueClient(ServiceBusConnectionString, "go_testing");

        queueClient.RegisterMessageHandler(
            async (message, token) =>
            {
                var messageBody = Encoding.UTF8.GetString(message.Body);

                Console.WriteLine($"Received: {messageBody}, time: {DateTime.Now}");

                await queueClient.CompleteAsync(message.SystemProperties.LockToken);
            },
            new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
            { MaxConcurrentCalls = 1, AutoComplete = false });
    }

So what are the times for 1000 messages?

Sending messages is faster in .Net Core, but receiving is slightly slower. However sending can be done much faster in .Net Core with batch send. Also receiving in some cases can be done faster, if you can write safe code that can process messages in parallel. Notice that in the last code snippet MaxConcurrentCalls is set to 1, that means reading messages is done synchronously.
Go code can be written faster probably as well. Golang is famous for support for parallel code with its goroutines. Should you go with go for Service Bus? Can’t really say if it’s worth it at this point, but it is definitely possible.

 

All code posted you can find it my github repo, go code itself is here: https://github.com/mikuam/Blog/tree/master/Go/src

You can read more about Service Bus in .Net Core in my post: Receiving messages from Azure Service Bus in .Net Core.

Receiving messages from Azure Service Bus in .Net Core

For some time now we can observe how new .Net Core framework is growing and become more mature. Version 2.0 was released in August 2017 and it is more capable and supports more platforms then it’s previous releases. But the biggest feature of this brand new Microsoft framework is its performance and it’s ability to handle http requests much faster then it’s bigger brother.

However introduction of new creation is only the beginning as more and more packages are being ported or rewritten to new, lighter framework. This is a natural opportunity for developers to implement some additional changes,  refactor existing code or maybe slightly simplify existing API. It also means that porting existing solutions to .Net Core might not be straightforward and in this article I’ll check how receiving messages from Service Bus looks like.

First things first

In order to start receiving messages, we need to have:

After all of this, my topic looks like this:

Receiving messages

To demonstrate how to receive messages I created a console application in .Net Core 2.0 framework. Then I installed Microsoft.Azure.ServiceBus (v2.0) nuget package and also Newtonsoft.Json to parse messages body. My ProductRatingUpdateMessage message class looks like this:

    public class ProductRatingUpdateMessage
    {
        public int ProductId { get; set; }

        public int SellerId { get; set; }

        public int RatingSum { get; set; }

        public int RatingCount { get; set; }
    }

All of the logic is inside MessageReceiver class:

    public class MessageReceiver
    {
        private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[privateKey]";

        public void Receive()
        {
            var subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, "productRatingUpdates", "sampleSubscription");

            try
            {
                subscriptionClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        var messageJson = Encoding.UTF8.GetString(message.Body);
                        var updateMessage = JsonConvert.DeserializeObject<ProductRatingUpdateMessage>(messageJson);

                        Console.WriteLine($"Received message with productId: {updateMessage.ProductId}");

                        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
                    },
                    new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
                    { MaxConcurrentCalls = 1, AutoComplete = false });
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
        }
    }

 

Notice that creating a SubscriptionClient is very simple, it takes just one line and handles all the work. Next up is RegisterMessageHandler which handles messages one by one and completes them in the end. If something goes wrong message will not be completed and after lock on it will be removed – it will be available for processing again. If you set AutoComplete option to true, then message will be automatically completed after returning from User Callback. This message pump approach won’t let you handle incoming messages in batches, but parameter MaxConcurrentCalls can be set to handle multiple messages in parallel.

Is it ready?

Microsoft.Azure.ServiceBus nuget package in version 2.0 offers most desirable functionality. It should be enough for most cases but it also has huge gaps:

  • Cannot receive messages in batches
  • Cannot manage entities – create topics, queues and subscriptions
  • Cannot check if queue, topic or subscription exist

Especially entities management features are important. It is reasonable that when scaling-up a service that read from topic, it creates it’s own subscription and handles it on it’s own. Currently developer needs to go to Azure Portal to create subscriptions for each micro-service manually.

Update! 

Version 3.1 supports entities management – have a look at my post about it: Managing ServiceBus queues, topics and subscriptions in .Net Core

If you liked code posted here, you can find it (and a lot more) in my github blog repo: https://github.com/mikuam/Blog.

 

 

 

Getting messages from Service Bus queue

In the previous post I discussed how to implement sending messages. In this post I will show how to receive messages. The simplest code looks like this:

static void GetMessage()
{
    try
    {
        var queueClient = GetQueueClient();
        var message = queueClient.Receive();

        Console.WriteLine("Received a message: " + message.GetBody<string>());
    }
    catch (Exception ex)
    {
        Console.WriteLine("An exception occured: " + ex.Message);
    }
}

private static QueueClient GetQueueClient(ReceiveMode receiveMode = ReceiveMode.ReceiveAndDelete)
{
    const string queueName = "stockchangerequest";
    var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
    var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, receiveMode);
    queueClient.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(30), 10);
    return queueClient;
}

Program will now receive a message, parse body as string and output it on console. But messages should be read as they appear, so there need to be some kind of a loop. It will be also nice if it wouldn’t block the main thread.

static async Task GetMessage()
{
    var queueClient = GetQueueClient();

    while (true)
    {
        try
        {
            var message = await queueClient.ReceiveAsync();

            Console.WriteLine("Received a message: " + message.GetBody<string>());
        }
        catch (Exception ex)
        {
            Console.WriteLine("An exception occured: " + ex.Message);
        }
    }
}

Now GetMessage can be run in a separate thread and will read messages endlessly even if an error occurs. This is ok, but in order to process large amount of messages it’s better to handle messages in batches with ReceiveBatchAsync method.

Why choosing right receive mode is important

getting-messages-ReceiveAndDelete

You may have noticed, that when creating a queue client I set receive mode to ReceiveAndDelete. This is a receiving mode that is faster, but less safe. In this mode when message is received by the client it is instantly taken out of the queue and it doesn’t exists there any more. This is now up to the client to process it properly so that in case of error or system shut down, no data will be lost. In distributed systems there is always a risk, that a server will temporary lost connection to any resource and message will not be processed, therefore it should somehow be recreated in a queue. This can be done with ScheduleMessageAsync method, that will put a brokered message on a queue with a delay, but system should know, that this message is in the queue for the second time. There is a message property called DeliveryCount that basically says that, but it cannot be set manually and will not be incremented when putting a message with ScheduleMessageAsync. When DeliveryCount will reach 10, message will be moved to dead letter queue by default. This is useful, cause message such as that will not be hanging in a queue for ever. So to have the same functionality you will have to implement your custom old messages handling mechanism.

PeekLock to the rescue

There is a simpler way to do it – PeelLock receive mode. This receive mode is slower, but provides safety and persistence. In this mode when message is retrieved be a client it stays in a queue, but is locked for others client to read. When the lock expires, the message will become available for other consumers. When client successfully processed the message it has to notify the queue that processing is completed and when it fails, it should abandon the messages. Abandoned messages goes back to the queue with incremented DeliveryCount.

getting-messages-PeekLock

This is what the code looks like:

static async Task GetMessagesWithPeekLock()
{
    var queueClient = GetQueueClient(ReceiveMode.PeekLock);

    while (true)
    {
        try
        {
            var messages = await queueClient.ReceiveBatchAsync(50);

            try
            {
                // processing
                foreach (var message in messages)
                {
                    Console.WriteLine("Received a message: " + message.GetBody<string>());
                }

                await queueClient.CompleteBatchAsync(messages.Select(m => m.LockToken));
            }
            catch (Exception e)
            {
                Console.WriteLine("An exception occured while processing: " + e.Message);
                foreach (var message in messages)
                {
                    await queueClient.AbandonAsync(message.LockToken);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("An exception occured: " + ex.Message);
        }
    }
}

This code is a bit slower then using ReceiveBatch with ReceiveAndDelete mode, but it’s much much faster then receiving message one by one. You probably noticed that there are to try-catch blocks. This is because first ensures as that there is a inifinite loop even if a connection problem occurs and second one is to catch exceptions while processing. When something goes wrong there, all messages will be abandoned and will return to the queue, regardless if they were processed or not. This of course can be improved when we can process messages one by one and abandon only those, that were not processed. The most important thing here is that this implementation is safe and no data will be lost.

Sending messages to Azure Service Bus queue

To connect to Azure Service Bus you need to import nuget package: WindowsAzure.ServiceBus. Next step would be getting a connection string to your resource group. It can be found in Azure Portal, in Service Bus section.

It should look like this:

azure-connection-string

Copy connection string – primary key and paste it in App.config file. A key like this:

should already be generated when you imported the package.

    using Microsoft.ServiceBus.Messaging;
    using System.Configuration;

    class Program
    {
        static void Main(string[] args)
        {
            var queueName = "stockchangerequest";
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);

            queueClient.Send(new BrokeredMessage("This is a test message content"));
        }
    }

Only few lines of code let you send  message to a queue. However, what if something goes wrong? This code can also be written asynchronous, so that sending a message would not block current thread. I’ll also add a try catch block with logging.

    using Microsoft.ServiceBus.Messaging;
    using System;
    using System.Configuration;
    using System.Threading.Tasks;

    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(SendMessageAsync).GetAwaiter().GetResult();
        }

        static async Task SendMessageAsync()
        {
            try
            {
                var queueName = "stockchangerequest";
                var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
                var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);

                await queueClient.SendAsync(new BrokeredMessage("This is a test message content"));
            }
            catch (Exception ex)
            {
                Console.WriteLine("An exception occured: " + ex.Message);
            }
        }
    }

Transient error and retry policy

This is an OK code. However, connecting to remote server such as Azure Service Bus will fail from time to time. Those situations are called transient, because they last for a very small amount of time. No matter how rarely it may occur or how fast it will be up, code have to prepared for that. In Azure Service Bus there is a mechanism implemented to cope with those problems – a retry policy.

Retry policies allows you to execute some operations repeatedly if an error occured. There are two types of retry policy: linear and exponential. Linear retry policy retries an operation periodically with same backoff period. An exponential retry policy execute operations again typically multiplying previous waiting time. Let’s see how it can be implemented.

queueClient.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(30), 10);

This line meas that if there will be any problems with connection to Azure Service Bus queue client will seamlessly try again after waiting between 5 and 30 seconds and that wait time will grow exponentially.

More about good practices and patterns you can find in this amazing Microsoft article.

.Net Framework also provides NoRetry policy and gives you RetryPolicy base class to implement your own policy.

One thing also need mentioning. For sending large amount of messages, you can use SendBatch and SendBatchAsync.