Tag Archives: cosmos db

Add CosmosDB persistent storage to Microsoft Orleans in .Net Core

Microsoft Orleans is a developer-friendly framework for building distributed, high-scale computing applications. It is a perfect solution for processing a large amount of data quickly. It shows it strengths especially when you need to use a storage while processing the data because it keeps a state in memory so save or update state operations are very fast.

If you want to know more about Microsoft Orleans, read my previous post about it: https://www.michalbialecki.com/2018/03/05/getting-started-microsoft-orleans/

Getting started with Microsoft Orleans for .Net Core

Microsoft Orleans 2.0 is the version written in .Net Standard, that can be used by applications targeting both .Net Core and the full framework. You can have a look at its github repo here: https://github.com/dotnet/orleans.

There is also a very good Microsoft page with an updated documentation: https://dotnet.github.io/orleans/Documentation/2.0/New.html

Regular Orleans solution consists of 4 projects: Grains – library with Orleans actor classes, Interfaces – abstraction for Grains to use in other libraries, Host – a project that runs a silos and a Client – project that connect to Host and execute clients code.

Have a look at the project structure, thanks to .Net Core it is simple and minimal.

Persistent storage in Microsoft Orleans

Microsoft Orleans offers a variety of options to save grain state. With one of the provided mechanisms, you can save grain state without writing any code, just providing proper configuration. You can also implement your own provider by implementing low-level interfaces. Here are some storage provider methods you can use when configuring silo host:

  • AddMemoryGrainStorage – grain state will be kept in memory and probably will be lost when the machine is down or new version is deployed
  • AddAzureBlobGrainStorage – Azure Blob storage will be used
  • AddAzureTableGrainStorage – Azure Table API will be used, Cosmos DB Table API is also compatible
  • AddAdoNetGrainStorage – ADO.Net storage in MSSQL database
  • AddDynamoDBGrainStorage – Amazon AWS DynamoDB storage

Note that adding Blob and Azure Table extension methods is possible when Microsoft.Orleans.Persistence.AzureStorage package is installed. ADO.Net extension method is in the Microsoft.Orleans.Persistence.AdoNet package and DynamoDB extension method is in Microsoft.Orleans.Persistence.DynamoDB package.

If you want to save grain state, in a grain class you need to extend Grain<T> instead of Grain, where T is an application data type, that will be persistent. You also can set a storage provider name in a grain class like this, but if you don’t, then a default provider will be used.

[StorageProvider(ProviderName="AzureTable")]
public class AccountGrain : Grain<Balance>, IAccountGrain

Read and write state in the grain

Grain state will be read automatically from storage provider when grain is activated and before OnActivateAsync() method is called. The grain is responsible for saving it’s state by calling base.WriteStateAsync() method. Orleans may perform performance optimizations and it is not guaranteed that state will be saved right after WriteStateAsync method is called. To be sure that grain uses the latest data from persistent storage, you can manually read data with base.ReadStateAsync() method.

Configuring CosmosDB Table API persistent storage

First I’ll extend an AccountGrain base class with Balance class, that will represent my state.

namespace MichalBialecki.com.OrleansCore.AccountTransfer.Grains
{
    [Serializable]
    public class Balance
    {
        public decimal Value { get; set; } = 0;
    }
    
    public class AccountGrain : Grain<Balance>, IAccountGrain
    {
        private readonly IServiceBusClient serviceBusClient;

        public AccountGrain(IServiceBusClient serviceBusClient)
        {
            this.serviceBusClient = serviceBusClient;
        }

        async Task IAccountGrain.Deposit(decimal amount)
        {
            this.State.Value += amount;
            await this.WriteStateAsync();

            await NotifyBalanceUpdate();
        }

        async Task IAccountGrain.Withdraw(decimal amount)
        {
            this.State.Value -= amount;
            await this.WriteStateAsync();

            await NotifyBalanceUpdate();
        }

        Task<decimal> IAccountGrain.GetBalance()
        {
            return Task.FromResult(this.State.Value);
        }

        private async Task NotifyBalanceUpdate()
        {
            var balanceUpdate = new BalanceUpdateMessage
            {
                AccountNumber = (int)this.GetPrimaryKeyLong(),
                Balance = this.State.Value
            };

            var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(balanceUpdate)));
            await serviceBusClient.SendMessageAsync(message);
        }
    }
}

I’m using a NotifyBalanceUpdate method to send Service Bus message with an updated state. Notice that I save a state with this.WriteStateAsync() method after I update it.

Next thing to do is set a right configuration in Host project Program.cs file.

    private static async Task<ISiloHost> StartSilo()
    {
        var builder = new SiloHostBuilder()
            .UseLocalhostClustering()
            .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
            .ConfigureServices(context => ConfigureDI(context))
            .ConfigureLogging(logging => logging.AddConsole())
            .AddAzureTableGrainStorageAsDefault(
                (options) => {
                    options.ConnectionString = CosmosBDConnectionString;
                    options.UseJson = true;
                });

        var host = builder.Build();
        await host.StartAsync();
        return host;
    }

This is a very simple configuration, where I use AddAzureTableGrainStorageAsDefault extensions method and provide a connection string to CosmosDB Table API storage and a flag that I’d like data to be saved as json.

After running my application in Azure Portal I can see OrleansGrainState table, that was automatically created and this is what it contains:

You can read more about grain persistence in this Microsoft page: https://dotnet.github.io/orleans/Documentation/Core-Features/Grain-Persistence.html

All code that you saw is available at my GitHub repository: https://github.com/mikuam/orleans-core-example.

Azure Cosmos DB – key-value database in the cloud

Azure CosmosDB table API is a key-value storage hosted in the cloud. It’s a part of Azure Cosmos DB, that is Microsoft’s multi-model database. It’s a globally distributed, low latency, high throughput solution with client SDKs available for .NET, Java, Python, and Node.js.

Interesting thing is that Microsoft guarantees that for a typical 1KB item read will take under 10ms and indexed writes under 15ms, where’s median is under 5ms with 99.99% availability SLA.

azure-cosmos-db

Image from https://docs.microsoft.com/en-us/azure/cosmos-db/media/introduction/

Why NoSQL?

First of all, if you’re not that familiar with the differences between relational and non-relational databases, go have a look at my short article about it: https://www.michalbialecki.com/2018/03/16/relational-vs-non-relational-databases/

NoSQL databases are databases where data are kept without taking care of relations, consistency, and transactions. The most important thing here is scalability and performance. They gained it’s popularity thanks to Web 2.0 companies like Facebook, Google, and Amazon.

Different data organization – data can be kept is a few different forms, like key-value pairs, columns, documents or graphs.

No data consistency – there are no triggers, foreign keys, relations to guard data consistency, an application needs to be prepared for that.

Horizontal scaling – easily scaling by adding more machines, not by adding more power to an existing machine.

What is a key-value database

It is a data storage designed for storing simple key-value pairs, where a key is a unique identifier, that has a value assigned to it. Is it a storage similar in concept to dictionary or hashmap. On the contrary from relational databases, key-value databases don’t have predefined structure and every row can have a different collection of fields.

Using CosmosDB Table API

To start using CosmosDB Table API go to Azure Portal and create a CosmosDB account for table storage. Create a table – in my example it’s name is accounts. Then you need to copy the primary connection string – this is all you need.

Now let’s have a look at the simple retrieve operation.

// Create a retrieve operation that takes a customer entity.
TableOperation retrieveOperation = TableOperation.Retrieve<CustomerEntity>("Smith", "Ben");

// Execute the retrieve operation.
TableResult retrievedResult = table.Execute(retrieveOperation);

Notice, that in order to get an entity you needed to provide two keys: Smith and Ben. This is because every table entity have a PartitionKey and RowKey property. RowKey is unique among one PartitionKey and the combination of both is unique per table. This gives you a great opportunity to partition data inside of one table, without a need to build your own thing.

Before starting coding install: Microsoft.Azure.CosmosDB.Table, Microsoft.Azure.DozumentDB and Microsoft.Azure.KeyValue.Core. The last one is Microsoft.Azure.Storage.Common that I installed with v8.6.0-preview version(you need to check include prerelease in nuget package manager to see it). It might work with the newer one, but it is not available when I write this text.

You can create a table client in such a way:

    var storageAccount = CloudStorageAccount.Parse(CosmosBDConnectionString);
    var tableClient = storageAccount.CreateCloudTableClient();
    var accountsTable = tableClient.GetTableReference("accounts");

An entity that I use for my accounts table looks like this:

using Microsoft.Azure.CosmosDB.Table;

namespace MichalBialecki.com.ServiceBus.Examples
{
    public class AccountEntity : TableEntity
    {
        public AccountEntity() { }

        public AccountEntity(string partition, string accountNumber)
        {
            PartitionKey = partition;
            RowKey = accountNumber;
        }

        public double Balance { get; set; }
    }
}

Notice that there is a constructor with PartitionKey and RowKey as parameters – it has to be there in order for entity class to work.

In an example that I wrote, I need to update account balance by the amount I’m given. In order to do that, I need to retrieve an entity and update it if it exists or add it if it doesn’t. The code might look like this:

    private static readonly object _lock = new object();

    public double UpdateAccount(int accountNumber, double amount)
    {
        lock(_lock)
        {
            return UpdateAccountThreadSafe(accountNumber, amount);
        }
    }

    private double UpdateAccountThreadSafe(int accountNumber, double amount)
    {
        var getOperation = TableOperation.Retrieve<AccountEntity>(PartitionKey, accountNumber.ToString());
        var result = accountsTable.Execute(getOperation);
        if (result.Result != null)
        {
            var account = result.Result as AccountEntity;
            account.Balance += amount;
            var replaceOperation = TableOperation.Replace(account);
            accountsTable.Execute(replaceOperation);

            return account.Balance;
        }
        else
        {
            var account = new AccountEntity
            {
                PartitionKey = PartitionKey,
                RowKey = accountNumber.ToString(),
                Balance = amount
            };
            accountsTable.Execute(TableOperation.Insert(account));

            return amount;
        }
    }

I used a locking mechanism so that I’m sure that this operation is atomic. It is because I made this class as a singleton that I want to use in parallel while processing service bus messages.

After reading bunch of messages, my table looks like this. Also Balance is saved there without a need to define it in any schema.

If you’re interested in more simple examples, you can find it at this Microsoft page: https://docs.microsoft.com/en-us/azure/cosmos-db/tutorial-develop-table-dotnet.

If you’re interested in CosmosDB document storage, go to my article about it: https://www.michalbialecki.com/2017/12/30/getting-started-with-cosmosdb-in-azure-with-net-core/

Getting started with CosmosDB in Azure with .NET Core

CosmosDB is Microsoft’s new way of storing data in the cloud, comparing to good old MSSQL Server. It offers globally distributed, multi-model database. Interesting fact is that it offers multiple model of storing data: key-value, column-family, documents and graph as shown in this picture:

azure-cosmos-db

Image from https://docs.microsoft.com/en-us/azure/cosmos-db/media/introduction/

First you need a Cosmos DB account

Create a Cosmos DB account, then go to Keys tab – you will need PrimaryKey and EndpointUri.

cosmos-db-keys

Now go to Data Explorer and create a database and collection. I created Documents database and Messages collection.

cosmos-db-data-explorer

Connecting to Cosmos DB

I’m developing my app in .NET Core and for that I need to install Microsoft.Azure.DocumentDB.Core nuget package. Then I created a DocumentDbService class, that will connect to application to Cosmos DB api.

public class DocumentDbService
{
    private const string DatabaseName = "Documents";

    private const string CollectionName = "Messages";

    public async Task SaveDocumentAsync(DocumentDto document)
    {
        try
        {
            var client = new DocumentClient(new Uri(ConfigurationHelper.GetCosmosDbEndpointUri()), ConfigurationHelper.GetCosmosDbPrimaryKey());
            await client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionName), document);
        }
        catch (Exception e)
        {
            Console.WriteLine("Error: {0}, Message: {1}", e.Message, e.GetBaseException().Message);
        }
    }
}

ConfigurationHelper class is just a static class that gets EndpointUri and PrimaryKey as strings, so you can just paste them here directly. The code above will create a new document in Documents database and Messages collection.

DocumentDto is just a simple object that will be saved as json:

public class DocumentDto
{
    public string StockId { get; set; }

    public string Name { get; set; }

    public float Price { get; set; }

    public DateTime UpdatedAt { get; set; }
}

In order do use it in ASP.NET Core I created a controller:

public class MessagesController : Controller
{
    [HttpPost]
    public async Task<IActionResult> Save([FromBody]SendMessageDto message)
    {
        try
        {
            var document = new DocumentDto
            {
                StockId = message.StockId,
                Name = message.Name,
                Price = message.Price,
                UpdatedAt = DateTime.UtcNow
            };

            await new DocumentDbService().SaveDocumentAsync(document);

            return StatusCode(200);
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            return StatusCode(500, e.Message);
        }
    }
}

Usage of it is very simple – it creates DocumentDto and store it in Cosmos DB database. To see the result you need to go to Azure’s Data Explorer and fetch for Messages like in a screen above.

Getting data from Cosmos DB with SQL api

Microsoft’s new storage api has ability to store data in a multiple formats. Let’s try getting the latest updates from Messages collection. In DocumentDbService class we need a part of code to get data:

public IQueryable<DocumentDto> GetLatestDocuments()
{
    try
    {
        var client = new DocumentClient(new Uri(ConfigurationHelper.GetCosmosDbEndpointUri()), ConfigurationHelper.GetCosmosDbPrimaryKey());
        return client.CreateDocumentQuery<DocumentDto>(
            UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionName),
            "SELECT * FROM Messages ORDER BY Messages.UpdatedAt desc",
            new FeedOptions { MaxItemCount = 10 });
    }
    catch (Exception e)
    {
        Console.WriteLine("Error: {0}, Message: {1}", e.Message, e.GetBaseException().Message);
        return null;
    }
}

This is where magic happens. As you can see I used plain old SQL query as it would be Messages table, but instead I queried json objects that does not necessary need to have UpdatedAt field.

Code in the controller is very simple.

[HttpGet]
public IQueryable<DocumentDto> GetTenLatestUpdates()
{
    try
    {
        var documents = new DocumentDbService().GetLatestDocuments();

        return documents;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        return null;
    }
}

Notice that GetTenLatestUpdates controller method returns IQueryable interface that on web will be presented as json, but there is also a way to efficiently filter data with OData.