Tag Archives: Service Bus

Why duplication isn’t always a bad thing in micro-services

From an early development age, I was taught, that duplication is a bad thing. Especially when it comes to storing data. Relational databases were invented to show data that relates to each other and be able to store them efficiently. There are even a few normalization rules, to be able to avoid data redundancy. We are slowly abandoning this approach to non-relational databases because of their simplicity and storage price reduction. Nevwrtheless, having the same thing in two places leads to ambiguity and chaos. It also refers to DRY rule:

DRY – don’t repeat yourself. Every piece of knowledge must have a single, unambiguous, authoritative representation within a system. (Wikipedia)

The concept of breaking DRY rule is called WET, commonly taken to stand for either “write everything twice”, “we enjoy typing” or “waste everyone’s time”. This isn’t nice, right? So when duplication is acceptable?

Let’s look at the example.

In this example, we see a network of stateful services that exchange data about products. Data can came from many sources and we need to send them to many destinations. Data flow from one service to another. Here are a couple of rules:

  • every micro-service has data about its own specific thing and keeps it persistent
  • services work in a publisher-subscriber model, where every service can publish the data and receive the data it needs
  • services don’t know about each other

Does this sounds familiar?

Event-driven microservices

Event-driven programming isn’t a new thing, which we can check in Wikipedia. It is a program, where the flow of the program is triggered by events. It is extensively used in graphical user interfaces and web applications, where user interactions trigger program execution.

Every major platform supports event-driven programming, there is AWS Lambda by Amazon, Azure Functions by Microsoft and Google Cloud Functions by Google. All of those technologies offer event triggering.

In back-end micro-services, an event can be for example a web request or a service bus message. Let’s use Service Bus messages, where every service will be connected to the bus and can act both as a publisher and subscriber.

In this architecture usage of Service Bus is crucial, because it provides some distinctive features:

  • services are lously coupled, they don’t know about each other. Building another micro-service that needs specific data is just a matter of subscribing to right publishers
  • it’s good for scalability – it doesn’t matter if 1 or 10 instances of services subscribes to a certain topic – it will work without any code change
  • it can handle big load – messages will be kept in a queue and service can consume it in its own pace
  • it has build-in mechanisms for failure – if message could not be processed, it will be put back to the queue for set amount of times. You can also specify custom retry policy, that can exponentialy extend wait time between retries

If you’d like to know more about Microsoft Service Bus in .Net Core, jump to my blog posts:

What happens when it fails?

When we notice that there might be something fishy going on with or micro-service, we have to be able to fix it. It might miss some data, but how to know exactly what data this micro-service should have? When micro-services are stateful, we have whole state saved in every one of them. This means, that we can make other services send data to one in failed state. Or even better – tell a service to fix itself!

You can see how a micro-services state can be fixed by a single admin request. Service can get all of the data only because other services are stateful. This came in handy not always in crisis situations, but also when debugging. Investigating stateless services when you actually don’t know what data came in and what came out can be really painful.

But it’s data duplication!

That’s right! In the mentioned scenario each micro-service is stateful and have its own database. Apart from all the good things I mention I just need to add, that those services are small and easy to maintain. However, they could be merged into one bigger micro-service, but that wouldn’t micro service anymore, would it? Also when services have less to do, they also work faster.

Ok, but back to the problem. It’s data duplication! With a big D! Almost all services share some parts of the same thing, how do we know which one is correct and which one to use? The answer is simple: keep data everywhere, but use only one source.

Single source of truth – it is one source for getting certain data. Whenever you want some data that are consistent and up-to-date, you need to take it from the source of truth. It guarantees that when two clients request the data at the same time, they will get the same result. This is very important in a distributed system, where one client can feed data on a product page showing titles and prices and another one should show the same data in a cart or checkout.

In our example single source of truth for products would be Product Service, and for marketing content would be Marketing Content Service.

An inspiration

Some time ago I got inspired by Mastering Chaos – A Netflix Guide to Microservices by Josh Evans talking about Netflix micro-services architecture. I strongly encourage you to watch it.

Below you can see how micro-services talk to each other and process data.

Yes, it’s a cool gif from mentioned presentation that I really wanted to show you 🙂

Receive Service Bus messages in Service Fabric

This is one of the proof of concent that I did at work. The idea was to add another Service Bus application to an existing solution, instead of starting a whole new micro-service. It was a lot faster just to add another .net core console application, but setting up Service Fabric cluster always brings some unexpected experiences.

What are my requirements:

  • everything has to be written in .Net Core
  • reading Service Bus messages is placed in a new console application
  • logging has to be configured
  • dependency injection needs to be configured
  • reading Service Bus messages needs to be registered in stateless service

Let’s get to work!

The entry point in console application

Console application are a bit specific. In most cases, we write console applications that are small and doesn’t require dependency injection or logging, apart from that to the console. But here I want to build a professional console application, that is not run once, but is a decent part of a bigger thing that we would need to maintain in the future.

The specific thing about console applications is that they have Main method and this method is run instantly after the execution and everything that you’d like to do, has to be there. That means, that both configuration and execution of an app needs to be in this one method. Let’s see how the code looks like:

    public class Program
    {
        private const string ServiceName = "MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType";

        private static IConfigurationRoot Configuration;

        public static async Task Main(string[] args)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
            Configuration = builder.Build();

            try
            {
                await ServiceRuntime.RegisterServiceAsync(ServiceName, CreateService);

                ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(ServiceBusStatelessService).Name);
                
                Thread.Sleep(Timeout.Infinite);
            }
            catch (Exception e)
            {
                ServiceEventSource.Current.ServiceHostInitializationFailed(e.ToString());
                throw;
            }
        }

        private static ServiceBusStatelessService CreateService(StatelessServiceContext context)
        {
            ContainerConfig.Init(context, Configuration);
            return ContainerConfig.GetInstance<ServiceBusStatelessService>();
        }
    }

Logging

In order to have logging provided by the framework, we need to install nuget packages:

  • Microsoft.Extensions.Logging
  • Microsoft.Extensions.Logging.Abstractions
  • Microsoft.Extensions.Configuration.Json

The all configuration stuff is done in the beginning:

var builder = new ConfigurationBuilder()
    .SetBasePath(Directory.GetCurrentDirectory())
    .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
Configuration = builder.Build();

In this example logging is simple, but if you’d like to configure log4net, just add this code when configuring IoC (I’ll show that later):

var loggerFactory = new LoggerFactory();
loggerFactory.AddLog4Net(skipDiagnosticLogs: false);

Appsettings.json file:

{
  "Logging": {
    "IncludeScopes": false,
    "LogLevel": {
      "Default": "Warning",
      "System": "Warning",
      "Microsoft": "Warning"
    }
  },
  "ConnectionStrings": {
    "ServiceBusConnectionString": "[your Service Bus connection]"
  },
  "Settings": {
    "TopicName": "balanceupdates",
    "SubscriptionName": "SFSubscription"
  }
}

Registering a stateless service

In order to register a service we need to run this line:

await ServiceRuntime.RegisterServiceAsync(ServiceName, CreateService);

You might wonder what is CreateService method, it looks like this:

private static ServiceBusStatelessService CreateService(StatelessServiceContext context)
{
    ContainerConfig.Init(context, Configuration);
    return ContainerConfig.GetInstance<ServiceBusStatelessService>();
}

Here is a place where I configure IoC container. It has to be done here, cause only when registering a Service Fabric service, we have an instance of StatelessServiceContext, that we need later.

Configuring IoC container

In order to have container implementation provided by the framework, just install Microsoft.Extensions.DependencyInjection nuget package. ContainerConfig class, in this case, looks like this:

    public static class ContainerConfig
    {
        private static ServiceProvider ServiceProvider;

        public static void Init(
            StatelessServiceContext context,
            IConfigurationRoot configuration)
        {
            ServiceProvider = new ServiceCollection()
                .AddLogging()
                .AddSingleton(context)
                .AddSingleton<ServiceBusStatelessService>()
                .AddSingleton<IServiceBusCommunicationListener, ServiceBusCommunicationListener>()
                .AddSingleton<IConfigurationRoot>(configuration)
                .BuildServiceProvider();
        }

        public static TService GetInstance<TService>() where TService : class
        {
            return ServiceProvider.GetService<TService>();
        }
    }

Adding a stateless service

In Program class I registered ServiceBusStatelessService class, that looks like this:

    public class ServiceBusStatelessService : StatelessService
    {
        private readonly IServiceBusCommunicationListener _serviceBusCommunicationListener;

        public ServiceBusStatelessService(StatelessServiceContext serviceContext, IServiceBusCommunicationListener serviceBusCommunicationListener)
            : base(serviceContext)
        {
            _serviceBusCommunicationListener = serviceBusCommunicationListener;
        }

        protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
        {
            yield return new ServiceInstanceListener(context => _serviceBusCommunicationListener);
        }
    }

ServiceBusStatelessService inherits from StatelessService and provides an instance of Service Bus listener. It looks like this:

    public class ServiceBusCommunicationListener : IServiceBusCommunicationListener
    {
        private readonly IConfigurationRoot _configurationRoot;
        private readonly ILogger _logger;

        private SubscriptionClient subscriptionClient;
        
        public ServiceBusCommunicationListener(IConfigurationRoot configurationRoot, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger(nameof(ServiceBusCommunicationListener));
            _configurationRoot = configurationRoot;
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            var sbConnectionString = _configurationRoot.GetConnectionString("ServiceBusConnectionString");
            var topicName = _configurationRoot.GetValue<string>("Settings:TopicName");
            var subscriptionName = _configurationRoot.GetValue<string>("Settings:SubscriptionName");

            subscriptionClient = new SubscriptionClient(sbConnectionString, topicName, subscriptionName);
            subscriptionClient.RegisterMessageHandler(
                async (message, token) =>
                {
                    var messageJson = Encoding.UTF8.GetString(message.Body);
                    // process here

                    Console.WriteLine($"Received message: {messageJson}");

                    await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
                },
                new MessageHandlerOptions(LogException)
                    { MaxConcurrentCalls = 1, AutoComplete = false });

            return Task.FromResult(string.Empty);
        }
        
        public Task CloseAsync(CancellationToken cancellationToken)
        {
            Stop();

            return Task.CompletedTask;
        }

        public void Abort()
        {
            Stop();
        }

        private void Stop()
        {
            subscriptionClient?.CloseAsync().GetAwaiter().GetResult();
        }

        private Task LogException(ExceptionReceivedEventArgs args)
        {
            _logger.LogError(args.Exception, args.Exception.Message);

            return Task.CompletedTask;
        }
    }

Notice, that all the work is done in OpenAsync method, that is run only once. In here I just register standard message handler, that reads from a Service Bus Subscription.

Configure Service Fabric cluster

All Service Fabric configuration is done in xml files. This can cause a huge headache when trying to debug and find errors, cause the only place you can find fairly useful information is console window.

It starts with adding a reference in SF project to a console application.

Next this is to have right name in console application ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?>
<ServiceManifest Name="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorPkg"
                 Version="1.0.0"
                 xmlns="http://schemas.microsoft.com/2011/01/fabric"
                 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <ServiceTypes>
    <!-- This is the name of your ServiceType. 
         This name must match the string used in the RegisterServiceAsync call in Program.cs. -->
    <StatelessServiceType ServiceTypeName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType" />
  </ServiceTypes>

  <!-- Code package is your service executable. -->
  <CodePackage Name="Code" Version="1.0.0">
    <EntryPoint>
      <ExeHost>
        <Program>MichalBialecki.com.SF.ServiceBusExample.MessageProcessor.exe</Program>
        <WorkingFolder>CodePackage</WorkingFolder>
      </ExeHost>
    </EntryPoint>
  </CodePackage>
</ServiceManifest>

Notice that ServiceTypeName has the same value as provided when registering a service in Program class.

Next place to set-up things is ApplicationManifest.xml in SF project.

<?xml version="1.0" encoding="utf-8"?>
<ApplicationManifest xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ApplicationTypeName="MichalBialecki.com.SF.ServiceBusExampleType" ApplicationTypeVersion="1.0.0" xmlns="http://schemas.microsoft.com/2011/01/fabric">
  <Parameters>
    <Parameter Name="InstanceCount" DefaultValue="1" />
  </Parameters>
  <!-- Import the ServiceManifest from the ServicePackage. The ServiceManifestName and ServiceManifestVersion 
       should match the Name and Version attributes of the ServiceManifest element defined in the 
       ServiceManifest.xml file. -->
  <ServiceManifestImport>
    <ServiceManifestRef ServiceManifestName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorPkg" ServiceManifestVersion="1.0.0" />
    <ConfigOverrides />
  </ServiceManifestImport>
  <DefaultServices>
    <!-- The section below creates instances of service types, when an instance of this 
         application type is created. You can also create one or more instances of service type using the 
         ServiceFabric PowerShell module.
         
         The attribute ServiceTypeName below must match the name defined in the imported ServiceManifest.xml file. -->
    <Service Name="MichalBialecki.com.SF.ServiceBusExample.MessageProcessor" ServicePackageActivationMode="ExclusiveProcess">
      <StatelessService ServiceTypeName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType" InstanceCount="[InstanceCount]">
        <SingletonPartition />
      </StatelessService>
    </Service>
  </DefaultServices>
</ApplicationManifest>

There are a few things you need to remember:

  • ServiceManifestName has the same value as ServiceManifest in ServiceManifest.xml in console app
  • ServiceTypeName type is the same as ServiceTypeName in ServiceManifest.xml in console app
  • MichalBialecki.com.SF.ServiceBusExample.MessageProcessor service has to be configured as StatelessService

Here is a proof that it really works:

That’s it, it should work. And remember that when it doesn’t, starting the whole thing again and build every small code change isn’t crazy idea 🙂

 

 All code posted here you can find on my GitHub: https://github.com/mikuam/service-fabric-service-bus-example

 

Microsoft Orleans – is it fast?

Microsoft Orleans is a developer-friendly framework for building distributed, high-scale computing applications. It does not require from developer to implement concurrency and data storage model. It requires developer to use predefined code blocks and enforces application to be build in a certain way. As a result Microsoft Orleans empowers developer with a framework with an exceptional performance.

Orleans proved its strengths in many scenarios, where the most recognizable ones are cloud services for Halo 4 and 5 games.

You can have a look at full introduction in my previous post: Getting started with Microsoft Orleans

The Scenario

To test the performance of Microsoft Orleans I’ll compare it to simple micro-service implementation. The scenario is about transferring money from one account to another using a persistent storage. Here is the idea:

  • Both services will use .Net Core
  • Data will be saved in Azure CosmosDB database
  • Services will read and send messages from Service Bus
  • One message will trigger transferring money, that will need to get and save data from DB and then service will send two messages with account balance updates

Simple Micro-service approach

This app is really simple. It is a console application, that registers message handler and processes messages. This is how architecture looks like, simple right?

Code that handles message looks like this:

    public void Run()
    {
        var service = new TableStorageService(_configuration);

        try
        {
            var subscriptionClient = new SubscriptionClient(
                _configuration[ServiceBusKey],
                "accountTransferUpdates",
                "commonSubscription");
            subscriptionClient.PrefetchCount = 1000;

            subscriptionClient.RegisterMessageHandler(
                async (message, token) =>
                {
                    var messageJson = Encoding.UTF8.GetString(message.Body);
                    var updateMessage = JsonConvert.DeserializeObject<AccountTransferMessage>(messageJson);

                    await service.UpdateAccount(updateMessage.From, -updateMessage.Amount);
                    await service.UpdateAccount(updateMessage.To, updateMessage.Amount);

                    Console.WriteLine($"Processed a message from {updateMessage.From} to {updateMessage.To}");
                },
                new MessageHandlerOptions(OnException)
                {
                    MaxAutoRenewDuration = TimeSpan.FromMinutes(60),
                    MaxConcurrentCalls = 1,
                    AutoComplete = true
                });
        }
        catch (Exception e)
        {
            Console.WriteLine("Exception: " + e.Message);
        }
    }

    private Task OnException(ExceptionReceivedEventArgs args)
    {
        Console.WriteLine(args.Exception);

        return Task.CompletedTask;
    }

TableStorageService is used to synchronize state with the database, which in this case it read and update account balance.

    public class TableStorageService
    {
        private const string EndpointUriKey = "CosmosDbEndpointUri";
        private const string PrimaryKeyKey = "CosmosDbPrimaryKey";
        private const string ServiceBusKey = "ServiceBusConnectionString";

        private readonly DocumentClient client;
        private readonly TopicClient topic;

        public TableStorageService(IConfigurationRoot configuration)
        {
            client = new DocumentClient(new Uri(configuration[EndpointUriKey]), configuration[PrimaryKeyKey]);
            topic = new TopicClient(configuration[ServiceBusKey], "balanceUpdates");
        }
        
        public async Task UpdateAccount(int accountNumber, decimal amount)
        {
            Account document;
            try
            {
                var response = await client.ReadDocumentAsync<Account>(accountNumber.ToString());
                document = response.Document;
                document.Balance += amount;
                await client.ReplaceDocumentAsync(accountNumber.ToString(), document);
            }
            catch (DocumentClientException de)
            {
                if (de.StatusCode == HttpStatusCode.NotFound)
                {
                    document = new Account { Id = accountNumber.ToString(), Balance = amount };
                    await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri("bialecki", "accounts"), document);
                }
                else
                {
                    throw;
                }
            }

            await NotifyBalanceUpdate(accountNumber, document.Balance);
        }

        private async Task NotifyBalanceUpdate(int accountNumber, decimal balance)
        {
            var balanceUpdate = new BalanceUpdateMessage
            {
                AccountNumber = accountNumber,
                Balance = balance
            };

            var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(balanceUpdate)));
            await topic.SendAsync(message);
        }
    }

DocumentClient is CosmosDB client provided by the framework. You might be intrigued by try-catch clause. Currently for in CosmosDB package for .Net Core there is no way to check if the document exists and the proposed solution is to handle an exception when the document is not found. In this case, the new document will be created. NotifyBalanceUpdate sends messages to Service Bus.

When we go to Azure portal, we can query the data to check if it is really there:

This is how reading 100 messages looks like:

Microsoft Orleans approach

Microsoft Orleans is an actor framework, where each actor can be understood as a separate service, that does some simple operations and can have its own state. In this case, every account can be an actor, it doesn’t matter if we have few or few hundred thousands of them, the framework will handle that. Another big advantage is that we do not need to care about concurrency and persistence, it is also handled by the framework for us. In Orleans, accounts can perform operations in parallel.  In this case, the architecture looks much different.

Project structure looks like this:

  • SiloHost – sets up and run a silo to host grains, which is just another name for actors
  • OrleansClient – second application. This one connects to the silo and run client code to use grains
  • AccountTransfer.Interfaces – its an abstraction for grains
  • AccountTransfer.Grains – grains implementation, that handles business logic

Let’s have a look at how running a silo looks like:

    public class Program
    {
        private static IConfigurationRoot configuration;

        public static int Main(string[] args)
        {
            return RunMainAsync().Result;
        }

        private static async Task<int> RunMainAsync()
        {
            try
            {
                var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

                configuration = builder.Build();

                var host = await StartSilo();
                Console.WriteLine("Press Enter to terminate...");
                Console.ReadLine();

                await host.StopAsync();

                return 0;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                return 1;
            }
        }

        private static async Task<ISiloHost> StartSilo()
        {
            var builder = new SiloHostBuilder()
                .UseLocalhostClustering()
                .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
                .ConfigureServices(context => ConfigureDI(context))
                .ConfigureLogging(logging => logging.AddConsole())
                .UseInClusterTransactionManager()
                .UseInMemoryTransactionLog()
                .AddAzureTableGrainStorageAsDefault(
                    (options) =>
                    {
                        options.ConnectionString = configuration.GetConnectionString("CosmosBDConnectionString");
                        options.UseJson = true;
                    })
                .UseTransactionalState();

            var host = builder.Build();
            await host.StartAsync();
            return host;
        }

        private static IServiceProvider ConfigureDI(IServiceCollection services)
        {
            services.AddSingleton<IServiceBusClient>((sp) => new ServiceBusClient(configuration.GetConnectionString("ServiceBusConnectionString")));

            return services.BuildServiceProvider();
        }
    }

This is the whole code. Amazingly short comparing to what we are doing here. Notice, that configuring CosmosDB Azure Table storage takes just a few lines. I even configured dependency injection that I will use in account grain.

This is how connecting to silo looks like:

    public class Program
    {
        private static IConfigurationRoot configuration;

        static int Main(string[] args)
        {
            return RunMainAsync().Result;
        }

        private static async Task<int> RunMainAsync()
        {
            try
            {
                var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

                configuration = builder.Build();

                using (var client = await StartClientWithRetries())
                {
                    DoClientWork(client);
                    Console.ReadKey();
                }

                return 0;
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                return 1;
            }
        }

        private static async Task<IClusterClient> StartClientWithRetries(int initializeAttemptsBeforeFailing = 5)
        {
            int attempt = 0;
            IClusterClient client;
            while (true)
            {
                try
                {
                    client = new ClientBuilder()
                        .UseLocalhostClustering()
                        .Configure<ClusterOptions>(options =>
                        {
                            options.ClusterId = "dev";
                            options.ServiceId = "AccountTransferApp";
                        })
                        .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(IAccountGrain).Assembly).WithReferences())
                        .ConfigureLogging(logging => logging.AddConsole())
                        .Build();

                    await client.Connect();
                    Console.WriteLine("Client successfully connect to silo host");
                    break;
                }
                catch (SiloUnavailableException)
                {
                    attempt++;
                    Console.WriteLine($"Attempt {attempt} of {initializeAttemptsBeforeFailing} failed to initialize the Orleans client.");
                    if (attempt > initializeAttemptsBeforeFailing)
                    {
                        throw;
                    }
                    await Task.Delay(TimeSpan.FromSeconds(4));
                }
            }

            return client;
        }

        private static Task HandleException(ExceptionReceivedEventArgs args)
        {
            Console.WriteLine(args.Exception + ", stack trace: " + args.Exception.StackTrace);
            return Task.CompletedTask;
        }
    }

This is also a simple console application. Both apps need to be run together, cause client is connecting to the silo and if fails, tries again after few seconds. The only part missing here is DoClientWork method:

    private static void DoClientWork(IClusterClient client)
    {
        var subscriptionClient = new SubscriptionClient(
            configuration.GetConnectionString("ServiceBusConnectionString"),
            "accountTransferUpdates",
            "orleansSubscription",
            ReceiveMode.ReceiveAndDelete);
        subscriptionClient.PrefetchCount = 1000;

        try
        {
            subscriptionClient.RegisterMessageHandler(
                async (message, token) =>
                {
                    var messageJson = Encoding.UTF8.GetString(message.Body);
                    var updateMessage = JsonConvert.DeserializeObject<AccountTransferMessage>(messageJson);

                    await client.GetGrain<IAccountGrain>(updateMessage.From).Withdraw(updateMessage.Amount);
                    await client.GetGrain<IAccountGrain>(updateMessage.To).Deposit(updateMessage.Amount);
                        
                    Console.WriteLine($"Processed a message from {updateMessage.From} to {updateMessage.To}");
                    await Task.CompletedTask;
                },
                new MessageHandlerOptions(HandleException)
                {
                    MaxAutoRenewDuration = TimeSpan.FromMinutes(60),
                    MaxConcurrentCalls = 20,
                    AutoComplete = true
                });
        }
        catch (Exception e)
        {
            Console.WriteLine("Exception: " + e.Message);
        }
    }

This is almost the same code that we had in micro-service approach. We are reading Service Bus messages and deserialize them, but then we use actors. From this point execution will be handled by them. AccountGrain looks like this:

    [Serializable]
    public class Balance
    {
        public decimal Value { get; set; } = 1000;
    }

    public class AccountGrain : Grain<Balance>, IAccountGrain
    {
        private readonly IServiceBusClient serviceBusClient;

        public AccountGrain(
            IServiceBusClient serviceBusClient)
        {
            this.serviceBusClient = serviceBusClient;
        }

        async Task IAccountGrain.Deposit(decimal amount)
        {
            try
            {
                this.State.Value += amount;
                await this.WriteStateAsync();

                await NotifyBalanceUpdate();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }

        async Task IAccountGrain.Withdraw(decimal amount)
        {
            this.State.Value -= amount;
            await this.WriteStateAsync();

            await NotifyBalanceUpdate();
        }

        Task<decimal> IAccountGrain.GetBalance()
        {
            return Task.FromResult(this.State.Value);
        }

        private async Task NotifyBalanceUpdate()
        {
            var balanceUpdate = new BalanceUpdateMessage
            {
                AccountNumber = (int)this.GetPrimaryKeyLong(),
                Balance = this.State.Value
            };

            var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(balanceUpdate)));
            await serviceBusClient.SendMessageAsync(message);
        }
    }

Notice that on top we have serializable Balance class. When defining actor like this: AccountGrain : Grain<Balance>, it means that Balance will be our state, that we can later refer to as this.State. Getting and updating state is trivial, and both Withdraw and Deposit causes sending Service Bus message by calling NotifyBalanceUpdate.

In Azure portal we can have a look how data is saved. I choose to serialize it to json, so we can see account state easily:

Let’s have a look at reading 1000 messages by a single thread with Microsoft Orleans looks like:

It runs noticeably faster, but what’s more interesting is that we can read messages with even 20 concurrent threads at a time:

Comparsion

As you could see, I used two approaches to read and process 100 and 1000 Service Bus messages, written in .net core with a persistant state in remote CosmosDB database. Results can be seen here:

Blue color represents reading 100 messages, red represents reading 1000 messages. As you can see Microsoft Orleans is a few times faster.

To sum up, using Microsoft Orleans:

Pros:

  • Microsoft actor framework could give you outstanding performance
  • It requires minimal knowledge to write your first app
  • Documentation is great
  • The code is open source, you can post issues

Cons:

  • It doesn’t fit every scenario
  • Maintenance and deployment is a bit more difficult than a simple IIS app

 

If you’re interested in the code, have a look at my GitHub:

 

Managing ServiceBus queues, topics and subscriptions in .Net Core

From version 3.1 of Microsoft.Azure.ServiceBus it is finally possible to manage queues, topics and subscriptions in .Net Core. Let’s have a look at how we can use it in real life scenarios.

Previously we would use sample code for getting a queue:

public IQueueClient GetQueueClient(string _serviceBusConnectionString, string _queueName)
{
    var queueClient = new QueueClient(_serviceBusConnectionString, _queueName);
    return queueClient;
}

Using ManagementClient we can write much better code.

public async Task<IQueueClient> GetOrCreateQueue(string _serviceBusConnectionString, string _queueName)
{
    var managementClient = new ManagementClient(_serviceBusConnectionString);
    if (!(await managementClient.QueueExistsAsync(_queueName)))
    {
        await managementClient.CreateQueueAsync(new QueueDescription(_queueName));
    }

    var queueClient = new QueueClient(_serviceBusConnectionString, _queueName);
    return queueClient;
}

Now before getting a queue, we are checking if a queue exists and if not, we are creating it. So when executing this code:

manager.GetOrCreateQueue(configuration["ServiceBusConnectionString"], "createTest").GetAwaiter().GetResult();

We will get a queue that we can use.

Customizing your ServiceBus subscription

It is pretty easy to create a topic subscription or a queue, but SubscriptionDescription object offers a lot more then just that. This is a simple code that creates a subscription:

public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
{
    var managementClient = new ManagementClient(serviceBusConnectionString);
    if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
    {
        await managementClient.CreateSubscriptionAsync(new SubscriptionDescription(topicPath, subscriptionName));
    }

    var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
    return subscriptionClient;
}

Let’s have a look at a few most important properties:

TopicPath The path of the topic that this subscription description belongs to
Name Name of the subscription
DefaultMessageTimeToLive This is the duration after which the message expires, starting from when the message is sent to the Service Bus. After that time, the message will be removed from a subscription
EnableDeadLetteringOnMessageExpiration Support for dead-letter queue. When you enable it, messages will come here instead of being removed from the main queue
EnableBatchedOperations It’s a good idea to set it to true, no matter if the reader supports batch operations or not. Doing things in batches is usually faster
LockDuration This is the duration for which message can be locked for processing
MaxDeliveryCount Maximum count of message returning to the subscription after failure processing. After that count message will be removed from the subscription

Let’s have a look what we can fill in in a real project.

public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
{
    var managementClient = new ManagementClient(serviceBusConnectionString);
    if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
    {
        await managementClient.CreateSubscriptionAsync(
            new SubscriptionDescription(topicPath, subscriptionName)
            {
                EnableBatchedOperations = true,
                AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                EnableDeadLetteringOnMessageExpiration = true,
                DefaultMessageTimeToLive = System.TimeSpan.FromDays(100),
                MaxDeliveryCount = 100,
                LockDuration = System.TimeSpan.FromMinutes(5)
            });
    }

    var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
    return subscriptionClient;
}

AutoDeleteOnIdle – subscription will be removed from the topic after 100 days idle – that is very unlikely. DefaultMessageTimeToLive and EnableDeadLetteringOnMessageExpiration – messages will be kept in the queue for very long – 100 days, then they will be sent to a dead letter queue. MaxDeliveryCount and LockDuration – message will be processed up to 100 times and for a maximum of 5 minutes.

We can do one more thing. When testing a project while development locally it’s ideal to work with real data. In the real case, we would probably have different Service Bus namespace and separate connection string for every environment. There is, however, a trick to use DEV data locally – just create your testing subscription! This is how it can look like:

    public async Task<ISubscriptionClient> GetOrCreateTopicSubscription(string serviceBusConnectionString, string topicPath, string subscriptionName)
    {
        var managementClient = new ManagementClient(serviceBusConnectionString);
#if DEBUG
        if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName + "_MikTesting")))
        {
            await managementClient.CreateSubscriptionAsync(
                new SubscriptionDescription(topicPath, subscriptionName + "_MikTesting")
                {
                    EnableBatchedOperations = true,
                    AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                    EnableDeadLetteringOnMessageExpiration = false,
                    DefaultMessageTimeToLive = System.TimeSpan.FromDays(2),
                    MaxDeliveryCount = 5,
                    LockDuration = System.TimeSpan.FromMinutes(5)
                });
        }
#else       
        if (!(await managementClient.SubscriptionExistsAsync(topicPath, subscriptionName)))
        {
            await managementClient.CreateSubscriptionAsync(
            new SubscriptionDescription(topicPath, subscriptionName)
            {
                EnableBatchedOperations = true,
                AutoDeleteOnIdle = System.TimeSpan.FromDays(100),
                EnableDeadLetteringOnMessageExpiration = true,
                DefaultMessageTimeToLive = System.TimeSpan.FromDays(100),
                MaxDeliveryCount = 100,
                LockDuration = System.TimeSpan.FromMinutes(5)
            });
        }
#endif
        var subscriptionClient = new SubscriptionClient(serviceBusConnectionString, topicPath, subscriptionName);
        return subscriptionClient;
    }

Testing subscription will have it’s own name, it will still be there up to 100 days of idle, but messages will be kept only for 2 days and they will not end up in dead letter queue. MaxDeliveryCount is only 5, cause if something goes wrong, we will end up having 5 the same errors in logs instead of 100 and this is much more likely to happen when testing locally.

Hope you found it useful, every code posted here is in my GitHub repository: https://github.com/mikuam/Blog

If you’re interested in more posts about Service Bus in .Net Core, have a look at:

Implementing deferral mechanism in ServiceBus

Deferral is a method to leave a message in a queue or subscription when you cannot process it at the moment. When using PeekLock read mode you read a message but leave it in a queue. When processing of a message is done, you call Complete and message is removed from queue, but when something goes wrong, you can call Abandon and message will be available in a queue for next read. Important thing to remember is that there is a time interval for locking the message in a queue, it’s LockDuration. If message will not be processed and completed during that time, it will be again available in the queue and you will get MessageLockLostException when trying to do anything with it. When message is losing it’s lock and stays in a queue, either by being abandoned or lock has expired, it will get it’s DeliveryCount incremented. After reaching limit, which is by default 10, message will be moved to dead-letter queue.

message-lifecycle

It is a great life-cycle, where message that we just cannot process, will go to dead-letter queue. With retry policy all transient errors that occurs while connecting to Service Bus will be handled internally – you do not have to worry about it. Problems may occur when you would like to handle connection problems not related to Service Bus. Solution that Microsoft is proposing is to defer a message that you cannot process, leave it in the queue, but hide it from receivers. This message will be available only when asking for it with it’s sequence number. Whole loop can look like this:

private static async Task StartListenLoopWithDeferral()
{
    var client = GetQueueClient(ReceiveMode.PeekLock);
    var deferredMessages = new List<KeyValuePair>();

    while (true)
    {
        var messages = Enumerable.Empty();

        try
        {
            messages = await client.ReceiveBatchAsync(50, TimeSpan.FromSeconds(10));
            messages = messages ?? Enumerable.Empty();
            if (!messages.Any())
            {
                continue;
            }

            foreach (var message in messages)
            {
                Console.WriteLine("Received a message: " + message.GetBody());
            }
            await client.CompleteBatchAsync(messages.Select(m => m.LockToken));

            // handling dererred messages
            var messagesToProcessAgain = deferredMessages.Where(d => d.Value < DateTime.Now).Take(10).ToList();
            foreach (var messageToProcess in messagesToProcessAgain)
            {
                BrokeredMessage message = null;
                try
                {
                    deferredMessages.Remove(messageToProcess);
                    message = await client.ReceiveAsync(messageToProcess.Key);

                    if (message != null)
                    {
                        // processing
                        Console.WriteLine("Received a message: " + message.GetBody());

                        await client.CompleteAsync(message.LockToken);
                    }
                }
                catch (MessageNotFoundException) { }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    deferredMessages.Add(new KeyValuePair(
                        message.SequenceNumber,
                        DateTimeOffset.Now + TimeSpan.FromMinutes(2)));
                }
            }
        }
        catch (MessageLockLostException e)
        {
            Console.WriteLine(e);

            foreach (var message in messages)
            {
                await message.AbandonAsync();
            }
        }
        catch  (Exception e)
        {
            Console.WriteLine(e);

            // defer messages
            foreach (var message in messages)
            {
                deferredMessages.Add(new KeyValuePair(
                    message.SequenceNumber,
                    DateTimeOffset.Now + TimeSpan.FromMinutes(2)));
                await message.DeferAsync();
            }
        }
    }
}

Code contains of endless loop, that ReceiveBatchAsync messages and process them. If something goes wrong, messages are added to deferredMessages list with 2 minutes time span. After completing messages successfully, program checks if there are any messages that should be processed again. If there would be any problem again,  message will be added to deferredMessages list again. There is also a check for MessageLockLostException, that might occur when message went to dead-letter queue and we should no longer ask for it. Message ids are kept in memory and that is obvious potential issue, cause when program will be restarted, this list will be wpied and there will be no way to get those messages from the queue.

Deferral pros and cons

Deferral mechanism can be useful, because it basis on original Azure infrastructure and handling messages this way keeps messages DeliveryCount property incrementing and eventually moves message to dead-letter queue. It is also a way to handle one message and make it available again after a custom time.

However, algorithm itself is not easy and needs to handle many edge cases, that makes it error prone. Second thing is that it breaks the order of messages, where in some cases it is essential to keep FIFO. I wouldn’t recommend it to in every scenario. Maybe a better approach would be to add a simple wait, when something goes wrong and try again after a while.

Receive message from queue in push model

Push and pull models are in general approaches of distributing the data between services. In the context of messaging it mean how messages are received by the client. In the previous post I showed how to receive messages by waiting for them to come – this is pull model. Receive method will wait some amount of time for messages to come, not necessarily blocking thread. In push model time of code execution is steered by data coming to the client, in this case – messages appearing in the queue. User can register a method to be executed when messages comes and when error occurs. Let’s see some code:

static void GetMessagesBySubscribing()
{
    var queueClient = GetQueueClient();
    queueClient.OnMessage(OnMessage);
}

private static void OnMessage(BrokeredMessage brokeredMessage)
{
    Console.WriteLine("Received a message: " + brokeredMessage.GetBody&lt;string&gt;());
}

Code just could not look simpler. All handling of message life is inside OnMessage method even if we specify different ReceiveMode. Let’s add error handling:

static void GetMessagesBySubscribing()
{
    var queueClient = GetQueueClient();
    var options = new OnMessageOptions
    {
        AutoComplete = true,
        MaxConcurrentCalls = 5
    };
    options.ExceptionReceived += OptionsOnExceptionReceived;
    queueClient.OnMessage(OnMessage, options);
}

private static void OptionsOnExceptionReceived(object sender, ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    if (exceptionReceivedEventArgs?.Exception != null)
    {
        Console.WriteLine("Exception occured: " + exceptionReceivedEventArgs.Exception.Message);
    }
}

private static void OnMessage(BrokeredMessage brokeredMessage)
{
    try
    {
        Console.WriteLine("Received a message: " + brokeredMessage.GetBody&lt;string&gt;());
    }
    catch (Exception e)
    {
        Console.WriteLine("Exception occured while processing: " + e.Message);
    }
}

A method that logs errors can be registered to ExceptionReceived in OnMessageOptions class. It will log every error that happens while handlind connection to queue and messages handling. I also added a try-catch block to catch all exceptions that may occur during message processing. If an error occurs while processing message that we would not catch, it will appear in ExceptionReceived metod, but it’s better to have it separated.

There are two more options that specified in OnMessageOptions. AutoComplete is set to true, then queue client will try to complete a message after it is processed and abandon it on error. However, there might be a scenario, when we would like to decide if message should be completed or abandoned during the processing. It could be accomplished with AutoComplete set to false. MaxConcurrentCalls defines how many threads should work in parallel processing messages in the queue. For example if set to 5, when many messages appears in a queue, queue client will create up to 5 different threads processing consecutive messages. Each thread will work on a separate message and will handle it’s life-cycle. If it is important that only one thread should work on processing messages, then MaxConcurrentCalls should be set to 0.

Proper error handling in ExceptionReceived

In ExceptionReceived method you will get all kinds of errors, but you shouldn’t worry about any transient errors. Don’t be surprised that you can get MessagingException even if you define a retry policy – those errors are bubbled up for monitoring purposes. So it is up to you if you log them or not. After having a problem with connecting, client should recreate itself and try to receive messages again. Code can look like this:

private static void OptionsOnExceptionReceived(object sender, ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    if (exceptionReceivedEventArgs?.Exception != null)
    {
        if (!(exceptionReceivedEventArgs.Exception is MessagingException && ((MessagingException)exceptionReceivedEventArgs.Exception).IsTransient))
        {
            Console.WriteLine("Exception occured: " + exceptionReceivedEventArgs.Exception.Message);
        }
    }
}

Azure Service Bus – introduction

Azure Service Bus is a Microsoft implementation of a messaging system, that works seamlessly in the cloud and does not require to set up a server of any kind. Messaging is a good alternative for communication between micro-services. Let’s compare the two.

REST communication

service-bus-communication

  • contract constrains may be a risk
  • synchronous model be default
  • load balancing is harder

Messaging communication

REST-communication

  • asynchronous
  • easy for load balancing, can have multiple competing readers
  • fire and forget

Using messages for communication is much more elastic in planning architecture. You can add move receivers and more senders at will. It’s a fire and forget model, when you do not care when message will be processed and you don’t need to worry how it will reach the receiver – your messaging system will care about it for you.

Azure Service Bus

To explore Service Bus options, go to your Azure portal and search for Service Bus.

service-bus-search

 

Go inside and you’ll need to create of choose a namespace. Namespaces can be useful when you would like to group multiple queries or topics for example by different contexts. It’s much easier to browse through them when you have for example all orders related queues in a orders namespace. Inside the namespace you’ll see list of your queues and topics.

topics-and-queues

 

Queues

Queue is a FIFO (First In, First out) is a messages delivery to one or more competing consumers.  Each message is received and processed by only one consumer and messaging system centrally manage this process,  so no deadlock will occur.

queue

It is expected that messages are processed in the same order which they arrived. Messages however do not have to be processed right away when they arrived. They can wait safely in the queue for first free consumer. With possibility of having multiple consumers is in very easy to balance load and add new consumers in the infrastructure, so messages will be processed faster. This is of course with an assumption that messages can be processed independently and do not relate to one another. Another key feature is that work of consumers do not affect work done by publisher. During heavy load or high usage of the system messages will be stored in the queue and consumers will not be overloaded with multiple call as it is in REST services, but continue to work and process messages with the same speed as usual.

In Azure Service Bus queues size of a queue can be huge, even up to 16GB. Message size limit is rather small – maximum of 256KB. However sessions support allows creation of unlimited-size sequences of related messages.

Topics

topic

 

Comparing to queue where only one consumer is processing a message, in topics messages are cloned in to subscriptions, which contains the same messages. This represents one-to-many form of communication in a publish/subscribe pattern. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages can be filtered by their attributes, where published can for example set recipient who should receive that message. Consumers instead of connecting directly to topic, connects to a subscriptions, that can be understood as a virtual queue. The same way as queues, subscriptions can have multiple competing consumers and this gives even more possibilities to plan services architecture.

An example usage a topic can be for example notifications about product stock status changed. Topic where messages are sent can have multiple subscriptions each filtering messages according to their needs. One may need all updates but other may be interested only in digital products like games. With SqlFilterExpression class filter rule can look like this:

ProductType = ‘Digital’ AND QuantityForSale > 0

Queues, topics and subscriptions gives a lot possibilities and are easier to scale and adapt in the future. Comparing to REST services they are slightly more complicated to implement, needs centralized system but offers much more in return.

Getting Azure subscription

There are a couple of ways you can get access to Azure services. You might get it from work or with BizSpark program. However, if you want to get it for free, just go to Azure website and start a trial.

 

azure-getting-started

After couple of minutes of configuration you will have full functional Azure environment with vast variety of options, overwhelming at first. Feel free to browse and explore what you can do with it. In the next blog posts I’ll try to bring up closer some of the options.

azure-dashboard-net