Getting messages from Service Bus queue

In the previous post I discussed how to implement sending messages. In this post I will show how to receive messages. The simplest code looks like this:

static void GetMessage()
{
    try
    {
        var queueClient = GetQueueClient();
        var message = queueClient.Receive();

        Console.WriteLine("Received a message: " + message.GetBody<string>());
    }
    catch (Exception ex)
    {
        Console.WriteLine("An exception occured: " + ex.Message);
    }
}

private static QueueClient GetQueueClient(ReceiveMode receiveMode = ReceiveMode.ReceiveAndDelete)
{
    const string queueName = "stockchangerequest";
    var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
    var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, receiveMode);
    queueClient.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(30), 10);
    return queueClient;
}

Program will now receive a message, parse body as string and output it on console. But messages should be read as they appear, so there need to be some kind of a loop. It will be also nice if it wouldn’t block the main thread.

static async Task GetMessage()
{
    var queueClient = GetQueueClient();

    while (true)
    {
        try
        {
            var message = await queueClient.ReceiveAsync();

            Console.WriteLine("Received a message: " + message.GetBody<string>());
        }
        catch (Exception ex)
        {
            Console.WriteLine("An exception occured: " + ex.Message);
        }
    }
}

Now GetMessage can be run in a separate thread and will read messages endlessly even if an error occurs. This is ok, but in order to process large amount of messages it’s better to handle messages in batches with ReceiveBatchAsync method.

Why choosing right receive mode is important

getting-messages-ReceiveAndDelete

You may have noticed, that when creating a queue client I set receive mode to ReceiveAndDelete. This is a receiving mode that is faster, but less safe. In this mode when message is received by the client it is instantly taken out of the queue and it doesn’t exists there any more. This is now up to the client to process it properly so that in case of error or system shut down, no data will be lost. In distributed systems there is always a risk, that a server will temporary lost connection to any resource and message will not be processed, therefore it should somehow be recreated in a queue. This can be done with ScheduleMessageAsync method, that will put a brokered message on a queue with a delay, but system should know, that this message is in the queue for the second time. There is a message property called DeliveryCount that basically says that, but it cannot be set manually and will not be incremented when putting a message with ScheduleMessageAsync. When DeliveryCount will reach 10, message will be moved to dead letter queue by default. This is useful, cause message such as that will not be hanging in a queue for ever. So to have the same functionality you will have to implement your custom old messages handling mechanism.

PeekLock to the rescue

There is a simpler way to do it – PeelLock receive mode. This receive mode is slower, but provides safety and persistence. In this mode when message is retrieved be a client it stays in a queue, but is locked for others client to read. When the lock expires, the message will become available for other consumers. When client successfully processed the message it has to notify the queue that processing is completed and when it fails, it should abandon the messages. Abandoned messages goes back to the queue with incremented DeliveryCount.

getting-messages-PeekLock

This is what the code looks like:

static async Task GetMessagesWithPeekLock()
{
    var queueClient = GetQueueClient(ReceiveMode.PeekLock);

    while (true)
    {
        try
        {
            var messages = await queueClient.ReceiveBatchAsync(50);

            try
            {
                // processing
                foreach (var message in messages)
                {
                    Console.WriteLine("Received a message: " + message.GetBody<string>());
                }

                await queueClient.CompleteBatchAsync(messages.Select(m => m.LockToken));
            }
            catch (Exception e)
            {
                Console.WriteLine("An exception occured while processing: " + e.Message);
                foreach (var message in messages)
                {
                    await queueClient.AbandonAsync(message.LockToken);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("An exception occured: " + ex.Message);
        }
    }
}

This code is a bit slower then using ReceiveBatch with ReceiveAndDelete mode, but it’s much much faster then receiving message one by one. You probably noticed that there are to try-catch blocks. This is because first ensures as that there is a inifinite loop even if a connection problem occurs and second one is to catch exceptions while processing. When something goes wrong there, all messages will be abandoned and will return to the queue, regardless if they were processed or not. This of course can be improved when we can process messages one by one and abandon only those, that were not processed. The most important thing here is that this implementation is safe and no data will be lost.

Leave a Reply

Your email address will not be published. Required fields are marked *