Receive Service Bus messages in Service Fabric

This is one of the proof of concent that I did at work. The idea was to add another Service Bus application to an existing solution, instead of starting a whole new micro-service. It was a lot faster just to add another .net core console application, but setting up Service Fabric cluster always brings some unexpected experiences.

What are my requirements:

  • everything has to be written in .Net Core
  • reading Service Bus messages is placed in a new console application
  • logging has to be configured
  • dependency injection needs to be configured
  • reading Service Bus messages needs to be registered in stateless service

Let’s get to work!

The entry point in console application

Console application are a bit specific. In most cases, we write console applications that are small and doesn’t require dependency injection or logging, apart from that to the console. But here I want to build a professional console application, that is not run once, but is a decent part of a bigger thing that we would need to maintain in the future.

The specific thing about console applications is that they have Main method and this method is run instantly after the execution and everything that you’d like to do, has to be there. That means, that both configuration and execution of an app needs to be in this one method. Let’s see how the code looks like:

    public class Program
    {
        private const string ServiceName = "MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType";

        private static IConfigurationRoot Configuration;

        public static async Task Main(string[] args)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
            Configuration = builder.Build();

            try
            {
                await ServiceRuntime.RegisterServiceAsync(ServiceName, CreateService);

                ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(ServiceBusStatelessService).Name);
                
                Thread.Sleep(Timeout.Infinite);
            }
            catch (Exception e)
            {
                ServiceEventSource.Current.ServiceHostInitializationFailed(e.ToString());
                throw;
            }
        }

        private static ServiceBusStatelessService CreateService(StatelessServiceContext context)
        {
            ContainerConfig.Init(context, Configuration);
            return ContainerConfig.GetInstance<ServiceBusStatelessService>();
        }
    }

Logging

In order to have logging provided by the framework, we need to install nuget packages:

  • Microsoft.Extensions.Logging
  • Microsoft.Extensions.Logging.Abstractions
  • Microsoft.Extensions.Configuration.Json

The all configuration stuff is done in the beginning:

var builder = new ConfigurationBuilder()
    .SetBasePath(Directory.GetCurrentDirectory())
    .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
Configuration = builder.Build();

In this example logging is simple, but if you’d like to configure log4net, just add this code when configuring IoC (I’ll show that later):

var loggerFactory = new LoggerFactory();
loggerFactory.AddLog4Net(skipDiagnosticLogs: false);

Appsettings.json file:

{
  "Logging": {
    "IncludeScopes": false,
    "LogLevel": {
      "Default": "Warning",
      "System": "Warning",
      "Microsoft": "Warning"
    }
  },
  "ConnectionStrings": {
    "ServiceBusConnectionString": "[your Service Bus connection]"
  },
  "Settings": {
    "TopicName": "balanceupdates",
    "SubscriptionName": "SFSubscription"
  }
}

Registering a stateless service

In order to register a service we need to run this line:

await ServiceRuntime.RegisterServiceAsync(ServiceName, CreateService);

You might wonder what is CreateService method, it looks like this:

private static ServiceBusStatelessService CreateService(StatelessServiceContext context)
{
    ContainerConfig.Init(context, Configuration);
    return ContainerConfig.GetInstance<ServiceBusStatelessService>();
}

Here is a place where I configure IoC container. It has to be done here, cause only when registering a Service Fabric service, we have an instance of StatelessServiceContext, that we need later.

Configuring IoC container

In order to have container implementation provided by the framework, just install Microsoft.Extensions.DependencyInjection nuget package. ContainerConfig class, in this case, looks like this:

    public static class ContainerConfig
    {
        private static ServiceProvider ServiceProvider;

        public static void Init(
            StatelessServiceContext context,
            IConfigurationRoot configuration)
        {
            ServiceProvider = new ServiceCollection()
                .AddLogging()
                .AddSingleton(context)
                .AddSingleton<ServiceBusStatelessService>()
                .AddSingleton<IServiceBusCommunicationListener, ServiceBusCommunicationListener>()
                .AddSingleton<IConfigurationRoot>(configuration)
                .BuildServiceProvider();
        }

        public static TService GetInstance<TService>() where TService : class
        {
            return ServiceProvider.GetService<TService>();
        }
    }

Adding a stateless service

In Program class I registered ServiceBusStatelessService class, that looks like this:

    public class ServiceBusStatelessService : StatelessService
    {
        private readonly IServiceBusCommunicationListener _serviceBusCommunicationListener;

        public ServiceBusStatelessService(StatelessServiceContext serviceContext, IServiceBusCommunicationListener serviceBusCommunicationListener)
            : base(serviceContext)
        {
            _serviceBusCommunicationListener = serviceBusCommunicationListener;
        }

        protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
        {
            yield return new ServiceInstanceListener(context => _serviceBusCommunicationListener);
        }
    }

ServiceBusStatelessService inherits from StatelessService and provides an instance of Service Bus listener. It looks like this:

    public class ServiceBusCommunicationListener : IServiceBusCommunicationListener
    {
        private readonly IConfigurationRoot _configurationRoot;
        private readonly ILogger _logger;

        private SubscriptionClient subscriptionClient;
        
        public ServiceBusCommunicationListener(IConfigurationRoot configurationRoot, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger(nameof(ServiceBusCommunicationListener));
            _configurationRoot = configurationRoot;
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            var sbConnectionString = _configurationRoot.GetConnectionString("ServiceBusConnectionString");
            var topicName = _configurationRoot.GetValue<string>("Settings:TopicName");
            var subscriptionName = _configurationRoot.GetValue<string>("Settings:SubscriptionName");

            subscriptionClient = new SubscriptionClient(sbConnectionString, topicName, subscriptionName);
            subscriptionClient.RegisterMessageHandler(
                async (message, token) =>
                {
                    var messageJson = Encoding.UTF8.GetString(message.Body);
                    // process here

                    Console.WriteLine($"Received message: {messageJson}");

                    await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
                },
                new MessageHandlerOptions(LogException)
                    { MaxConcurrentCalls = 1, AutoComplete = false });

            return Task.FromResult(string.Empty);
        }
        
        public Task CloseAsync(CancellationToken cancellationToken)
        {
            Stop();

            return Task.CompletedTask;
        }

        public void Abort()
        {
            Stop();
        }

        private void Stop()
        {
            subscriptionClient?.CloseAsync().GetAwaiter().GetResult();
        }

        private Task LogException(ExceptionReceivedEventArgs args)
        {
            _logger.LogError(args.Exception, args.Exception.Message);

            return Task.CompletedTask;
        }
    }

Notice, that all the work is done in OpenAsync method, that is run only once. In here I just register standard message handler, that reads from a Service Bus Subscription.

Configure Service Fabric cluster

All Service Fabric configuration is done in xml files. This can cause a huge headache when trying to debug and find errors, cause the only place you can find fairly useful information is console window.

It starts with adding a reference in SF project to a console application.

Next this is to have right name in console application ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?>
<ServiceManifest Name="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorPkg"
                 Version="1.0.0"
                 xmlns="http://schemas.microsoft.com/2011/01/fabric"
                 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <ServiceTypes>
    <!-- This is the name of your ServiceType. 
         This name must match the string used in the RegisterServiceAsync call in Program.cs. -->
    <StatelessServiceType ServiceTypeName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType" />
  </ServiceTypes>

  <!-- Code package is your service executable. -->
  <CodePackage Name="Code" Version="1.0.0">
    <EntryPoint>
      <ExeHost>
        <Program>MichalBialecki.com.SF.ServiceBusExample.MessageProcessor.exe</Program>
        <WorkingFolder>CodePackage</WorkingFolder>
      </ExeHost>
    </EntryPoint>
  </CodePackage>
</ServiceManifest>

Notice that ServiceTypeName has the same value as provided when registering a service in Program class.

Next place to set-up things is ApplicationManifest.xml in SF project.

<?xml version="1.0" encoding="utf-8"?>
<ApplicationManifest xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ApplicationTypeName="MichalBialecki.com.SF.ServiceBusExampleType" ApplicationTypeVersion="1.0.0" xmlns="http://schemas.microsoft.com/2011/01/fabric">
  <Parameters>
    <Parameter Name="InstanceCount" DefaultValue="1" />
  </Parameters>
  <!-- Import the ServiceManifest from the ServicePackage. The ServiceManifestName and ServiceManifestVersion 
       should match the Name and Version attributes of the ServiceManifest element defined in the 
       ServiceManifest.xml file. -->
  <ServiceManifestImport>
    <ServiceManifestRef ServiceManifestName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorPkg" ServiceManifestVersion="1.0.0" />
    <ConfigOverrides />
  </ServiceManifestImport>
  <DefaultServices>
    <!-- The section below creates instances of service types, when an instance of this 
         application type is created. You can also create one or more instances of service type using the 
         ServiceFabric PowerShell module.
         
         The attribute ServiceTypeName below must match the name defined in the imported ServiceManifest.xml file. -->
    <Service Name="MichalBialecki.com.SF.ServiceBusExample.MessageProcessor" ServicePackageActivationMode="ExclusiveProcess">
      <StatelessService ServiceTypeName="MichalBialecki.com.SF.ServiceBusExample.MessageProcessorType" InstanceCount="[InstanceCount]">
        <SingletonPartition />
      </StatelessService>
    </Service>
  </DefaultServices>
</ApplicationManifest>

There are a few things you need to remember:

  • ServiceManifestName has the same value as ServiceManifest in ServiceManifest.xml in console app
  • ServiceTypeName type is the same as ServiceTypeName in ServiceManifest.xml in console app
  • MichalBialecki.com.SF.ServiceBusExample.MessageProcessor service has to be configured as StatelessService

Here is a proof that it really works:

That’s it, it should work. And remember that when it doesn’t, starting the whole thing again and build every small code change isn’t crazy idea 🙂

 

 All code posted here you can find on my GitHub: https://github.com/mikuam/service-fabric-service-bus-example

 

3 thoughts on “Receive Service Bus messages in Service Fabric

  1. Justin Self

    I’m curious why you decided to make a customer listener for Azure Service Bus instead of just managing it explicitly from the Run Method.

    With this approach, you’ve got app logic mixed in with your infrastructure management code.

    What’s the benefit of letting SF manage the lifecycle of the service bus connection via the listener VS you managing it explicitly via the Run method?

    Reply
    1. Michał Białecki Post author

      Hi Justin, those are tough questions you’re asking. I’ll try to answer the best I can.
      You can always manage Service Bus Listener in Run method, that’s the easiest thing to do and it just works. When registering SB listener in Service Fabric as a service, it will be managed as a service, so in case of problems, you will be able to restart it. You also have a possibility to scale it differently and the rest of your application. However, I haven’t run into a practical case, where that would be needed. Also, in real life, when having listener in SF didn’t cause any problems. So I think it is your own choice, depending on what you need. I guess that in most cases, there is no difference. There is one more thing – I’m not sure how SF is handling long-running threads, so stateless service could be safer.

      When it comes to mixing app code and infrastructure code, I wouldn’t agree. I created ServiceBusStatelessService, that encapsulates everything connected to Service Bus and there is a separate ServiceBusCommunicationListener for listening, that contains app-specific logic.

      Reply
  2. Bjorn

    Hi,

    Looks great the code. I try to add it in a stateless service fabric service.

    I dont understand though, where is OpenAsync called? now it doesnt work for me, nothing happens.

    Cheers

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *