Receive message from queue in push model

Push and pull models are in general approaches of distributing the data between services. In the context of messaging it mean how messages are received by the client. In the previous post I showed how to receive messages by waiting for them to come – this is pull model. Receive method will wait some amount of time for messages to come, not necessarily blocking thread. In push model time of code execution is steered by data coming to the client, in this case – messages appearing in the queue. User can register a method to be executed when messages comes and when error occurs. Let’s see some code:

static void GetMessagesBySubscribing()
{
    var queueClient = GetQueueClient();
    queueClient.OnMessage(OnMessage);
}

private static void OnMessage(BrokeredMessage brokeredMessage)
{
    Console.WriteLine("Received a message: " + brokeredMessage.GetBody<string>());
}

Code just could not look simpler. All handling of message life is inside OnMessage method even if we specify different ReceiveMode. Let’s add error handling:

static void GetMessagesBySubscribing()
{
    var queueClient = GetQueueClient();
    var options = new OnMessageOptions
    {
        AutoComplete = true,
        MaxConcurrentCalls = 5
    };
    options.ExceptionReceived += OptionsOnExceptionReceived;
    queueClient.OnMessage(OnMessage, options);
}

private static void OptionsOnExceptionReceived(object sender, ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    if (exceptionReceivedEventArgs?.Exception != null)
    {
        Console.WriteLine("Exception occured: " + exceptionReceivedEventArgs.Exception.Message);
    }
}

private static void OnMessage(BrokeredMessage brokeredMessage)
{
    try
    {
        Console.WriteLine("Received a message: " + brokeredMessage.GetBody<string>());
    }
    catch (Exception e)
    {
        Console.WriteLine("Exception occured while processing: " + e.Message);
    }
}

A method that logs errors can be registered to ExceptionReceived in OnMessageOptions class. It will log every error that happens while handlind connection to queue and messages handling. I also added a try-catch block to catch all exceptions that may occur during message processing. If an error occurs while processing message that we would not catch, it will appear in ExceptionReceived metod, but it’s better to have it separated.

There are two more options that specified in OnMessageOptions. AutoComplete is set to true, then queue client will try to complete a message after it is processed and abandon it on error. However, there might be a scenario, when we would like to decide if message should be completed or abandoned during the processing. It could be accomplished with AutoComplete set to false. MaxConcurrentCalls defines how many threads should work in parallel processing messages in the queue. For example if set to 5, when many messages appears in a queue, queue client will create up to 5 different threads processing consecutive messages. Each thread will work on a separate message and will handle it’s life-cycle. If it is important that only one thread should work on processing messages, then MaxConcurrentCalls should be set to 0.

Proper error handling in ExceptionReceived

In ExceptionReceived method you will get all kinds of errors, but you shouldn’t worry about any transient errors. Don’t be surprised that you can get MessagingException even if you define a retry policy – those errors are bubbled up for monitoring purposes. So it is up to you if you log them or not. After having a problem with connecting, client should recreate itself and try to receive messages again. Code can look like this:

private static void OptionsOnExceptionReceived(object sender, ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    if (exceptionReceivedEventArgs?.Exception != null)
    {
        if (!(exceptionReceivedEventArgs.Exception is MessagingException && ((MessagingException)exceptionReceivedEventArgs.Exception).IsTransient))
        {
            Console.WriteLine("Exception occured: " + exceptionReceivedEventArgs.Exception.Message);
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *