Monthly Archives: February 2018

Receiving messages from Azure Service Bus in .Net Core

For some time now we can observe how new .Net Core framework is growing and become more mature. Version 2.0 was released in August 2017 and it is more capable and supports more platforms then it’s previous releases. But the biggest feature of this brand new Microsoft framework is its performance and it’s ability to handle http requests much faster then it’s bigger brother.

However introduction of new creation is only the beginning as more and more packages are being ported or rewritten to new, lighter framework. This is a natural opportunity for developers to implement some additional changes,  refactor existing code or maybe slightly simplify existing API. It also means that porting existing solutions to .Net Core might not be straightforward and in this article I’ll check how receiving messages from Service Bus looks like.

First things first

In order to start receiving messages, we need to have:

After all of this, my topic looks like this:

Receiving messages

To demonstrate how to receive messages I created a console application in .Net Core 2.0 framework. Then I installed Microsoft.Azure.ServiceBus (v2.0) nuget package and also Newtonsoft.Json to parse messages body. My ProductRatingUpdateMessage message class looks like this:

    public class ProductRatingUpdateMessage
    {
        public int ProductId { get; set; }

        public int SellerId { get; set; }

        public int RatingSum { get; set; }

        public int RatingCount { get; set; }
    }

All of the logic is inside MessageReceiver class:

    public class MessageReceiver
    {
        private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[privateKey]";

        public void Receive()
        {
            var subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, "productRatingUpdates", "sampleSubscription");

            try
            {
                subscriptionClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        var messageJson = Encoding.UTF8.GetString(message.Body);
                        var updateMessage = JsonConvert.DeserializeObject<ProductRatingUpdateMessage>(messageJson);

                        Console.WriteLine($"Received message with productId: {updateMessage.ProductId}");

                        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
                    },
                    new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
                    { MaxConcurrentCalls = 1, AutoComplete = false });
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
        }
    }

 

Notice that creating a SubscriptionClient is very simple, it takes just one line and handles all the work. Next up is RegisterMessageHandler which handles messages one by one and completes them in the end. If something goes wrong message will not be completed and after lock on it will be removed – it will be available for processing again. If you set AutoComplete option to true, then message will be automatically completed after returning from User Callback. This message pump approach won’t let you handle incoming messages in batches, but parameter MaxConcurrentCalls can be set to handle multiple messages in parallel.

Is it ready?

Microsoft.Azure.ServiceBus nuget package in version 2.0 offers most desirable functionality. It should be enough for most cases but it also has huge gaps:

  • Cannot receive messages in batches
  • Cannot manage entities – create topics, queues and subscriptions
  • Cannot check if queue, topic or subscription exist

Especially entities management features are important. It is reasonable that when scaling-up a service that read from topic, it creates it’s own subscription and handles it on it’s own. Currently developer needs to go to Azure Portal to create subscriptions for each micro-service manually.

Update! 

Version 3.1 supports entities management – have a look at my post about it: Managing ServiceBus queues, topics and subscriptions in .Net Core

If you liked code posted here, you can find it (and a lot more) in my github blog repo: https://github.com/mikuam/Blog.

 

 

 

Sending and receiving big files using Egnyte.API nuget package

Handling big files can be a problem when sending it through web. Simple REST calls are enough for small or medium files, but it’s limitation is a size of a request, that cannot be larger then 2GB. For files larger than that, you have to send or download file in chunks or as a stream.

In this post I’ll describe how to send and download really big files, bigger then 2GB connecting to Egnyte cloud storage with Egnyte.Api nuget package. I have written an introduction to Egnyte api here and wrote about using Egnyte.Api nuget package here.

Sending big files in chunks.

Egnyte API exposes dedicated method for sending big files, which is described here: Egnyte file chunked upload. First you need to install Egnyte.Api nuget package. Simple code can look like this:

    var client = new EgnyteClient(Token, Domain);

    var fileStream = new MemoryStream(File.ReadAllBytes("C:/test/big-file.zip"));
    var response = await ChunkUploadFile(client, "Shared/MikTests/Blog/big-file.zip", fileStream);

And ChunkUploadFile asynchronous helper method looks like this:

    private async Task<UploadedFileMetadata> ChunkUploadFile(
        EgnyteClient client,
        string serverFilePath,
        MemoryStream fileStream)
    {
        // first chunk
        var defaultChunkLength = 10485760;
        var firstChunkLength = defaultChunkLength;
        if (fileStream.Length < firstChunkLength)
        {
            firstChunkLength = (int)fileStream.Length;
        }

        var bytesRead = firstChunkLength;
        var buffer = new byte[firstChunkLength];
        fileStream.Read(buffer, 0, firstChunkLength);

        var response = await client.Files.ChunkedUploadFirstChunk(serverFilePath, new MemoryStream(buffer))
            .ConfigureAwait(false);
        int number = 2;

        while (bytesRead < fileStream.Length)
        {
            var nextChunkLength = defaultChunkLength;
            bool isLastChunk = false;
            if (bytesRead + nextChunkLength >= fileStream.Length)
            {
                nextChunkLength = (int)fileStream.Length - bytesRead;
                isLastChunk = true;
            }

            buffer = new byte[nextChunkLength];
            fileStream.Read(buffer, 0, nextChunkLength);

            if (!isLastChunk)
            {
                await client.Files.ChunkedUploadNextChunk(
                    serverFilePath,
                    number,
                    response.UploadId,
                    new MemoryStream(buffer)).ConfigureAwait(false);
            }
            else
            {
                return await client.Files.ChunkedUploadLastChunk(
                    serverFilePath,
                    number,
                    response.UploadId,
                    new MemoryStream(buffer)).ConfigureAwait(false);
            }
            number++;
            bytesRead += nextChunkLength;
        }

        throw new Exception("Something went wrong - unable to enumerate to next chunk.");
    }

Notice, that this code uses three methods that are reflected to three web requests and they are used for sending firs, next and last data chunk. Response of ChunkedUploadFirstChunk gives you UploadId that will identify upload and must be provided in other two methods. Buffer size I used is 10485760 bytes, that is 10 Megabytes, but you can use whatever suites you between 10 MB and 1 GB. Memory usage of sample console application looks like this:

Downloading big files

Downloading is much simpler then uploading. Important thing is to use streams the right way, so that application would not allocate to much memory.

    var client = new EgnyteClient(Token, Domain);

    var responseStream = await client.Files.DownloadFileAsStream("Shared/MikTests/Blog/big-file.zip");

    using (FileStream file = new FileStream("C:/test/big-file01.zip", FileMode.OpenOrCreate, FileAccess.Write))
    {
        CopyStream(responseStream.Data, file);
    }

And CopyStream helper method looks like this:

    /// <summary>
    /// Copies the contents of input to output. Doesn't close either stream.
    /// </summary>
    public static void CopyStream(Stream input, Stream output)
    {
        byte[] buffer = new byte[8 * 1024];
        int len;
        while ((len = input.Read(buffer, 0, buffer.Length)) > 0)
        {
            output.Write(buffer, 0, len);
        }
    }

I tested this code by sending and downloading 2.5GB files and many smaller ones and it works great.

All posted code is available in my public github repository: https://github.com/mikuam/Blog.

If you’d like to see other examples of usage Egnyte.Api, let me know.

How to handle error 0x800703E3, when user cancells file download

Recently at work I came across a difficult error, that gives an error message, that would lead me nowhere.

The remote host closed the connection. The error code is 0x800703E3.

I’ll give you more context – error occurs in the micro-service that serves big files across the web with REST interface. Service was working perfectly and none of our clients rose issues. But something was wrong. After some hours I finally managed to reproduce it. The error occurred when client was downloading file, but intentionally canceled it. How to handle such situation? Exception did not have any distinct type that can be handled separately.

In odrer to handle it in ASP.NET MVC properly I added an exception logger:

public static class RegisterFilters
{
    public static void Execute(HttpConfiguration configuration)
    {
        configuration.Services.Add(typeof(IExceptionLogger), new WebExceptionLogger());
    }
}

And WebExceptionLogger class implementation:

public class WebExceptionLogger : ExceptionLogger
{
    private const int RequestCancelledByUserExceptionCode = -2147023901;

    public override void Log(ExceptionLoggerContext context)
    {
        var dependencyScope = context.Request.GetDependencyScope();
        var loggerFactory = dependencyScope.GetService(typeof(ILoggerFactory)) as ILoggerFactory;
        if (loggerFactory == null)
        {
            throw new IoCResolutionException<ILoggerFactory>();
        }

        var logger = loggerFactory.GetTechnicalLogger<WebExceptionLogger>();
        if (context.Exception.HResult == RequestCancelledByUserExceptionCode)
        {
            logger.Info($"Request to url {context.Request.RequestUri} was cancelled by user.");
        }
        else
        {
            logger.Error("An unhandled exception has occured", context.Exception);
        }
    }
}

I noticed that this specific error type has HResult = -2147023901, so this is what I’m filtering by.

Hope this helps you.