Data Subscriptions in RavenDB

time to read 4 min | 680 words

Most of the time, RavenDB  is being used for OLTP scenarios, and it is doing great work there. However, we have customers that use RavenDB as the source for data processing jobs. Those jobs calls for processing all / most of the documents in the database.

A typical example would be an ordering system, where each document is an order, and we need to process the orders. You can certainly handle this right now, but that requires you to maintain quite a bit of state in order to do so efficiently. What we have done is to take this entire process and make this very simple to use. I know that this is a bit vague, but let me try showing you some code, and hopefully it will clear things up.

var id = store.Subscriptions.Create(new SubscriptionCriteria
{
    BelongsToCollection = "Orders",
    PropertiesMatch =
    {
        {"Valid", true}
    },
});

Here we create an subscription, along with its configuration. In this case, give me all the orders, but only those which are valid. We can also do a bunch more basic filters like that. The result of this operation is an id, which I can use to open the subscription.

Note that subscriptions are persistent and long lived, so you are expected to hold on to that id and make use of it. Using the subscription id, we can open the subscription:

IObservable<User> subscription = store.Subscriptions.Open<User>(id, new SubscriptionConnectionOptions
{
    IgnoreSubscribersErrors = false,
    BatchOptions = new SubscriptionBatchOptions()
    {
        AcknowledgmentTimeout = TimeSpan.FromMinutes(1),
        MaxDocCount = 1024 * 16,
        MaxSize = 1024 * 1024 * 4
    },
    ClientAliveNotificationInterval = TimeSpan.FromSeconds(10),
});

You can see that we specify some details about the running connection. In particular, we limit the size of a single batch, and the heart beat intervals. I’ll touch on the error handling a bit later, first, let us see how we actually get the data. Because the subscription is an IObservable<RavenJObject>, that means that you can just utilize reactive extensions and subscribe to the incoming stream as you would expect to. And it means that you’ll continue to get them, even for items that were added after you opened the subscription.

What is interesting is the error handling scenario. Data subscriptions will ensure that you’ll receive each document at least once, but what happens if there is an error? In that case, it depends on your configuration. In the code above, we don’t allow errors, so we you’ll get the same document over and over until you have successfully processed it. If you set IgnoreSubscribersErrors to true, we’ll ignore the errors raised by subscribers.

The nice thing about this is that it works even in the presence of crashes. Under the scenes, once you have successfully processed a document, we’ll send a confirmation to the server about it, so if there is a crash, we know we already processed it. If you crashed midway, we’ll just resend you the relevant document when you open the subscription again.