Data Subscriptions in RavenDB
Most of the time, RavenDB is being used for OLTP scenarios, and it is doing great work there. However, we have customers that use RavenDB as the source for data processing jobs. Those jobs calls for processing all / most of the documents in the database.
A typical example would be an ordering system, where each document is an order, and we need to process the orders. You can certainly handle this right now, but that requires you to maintain quite a bit of state in order to do so efficiently. What we have done is to take this entire process and make this very simple to use. I know that this is a bit vague, but let me try showing you some code, and hopefully it will clear things up.
var id = store.Subscriptions.Create(new SubscriptionCriteria { BelongsToCollection = "Orders", PropertiesMatch = { {"Valid", true} }, });
Here we create an subscription, along with its configuration. In this case, give me all the orders, but only those which are valid. We can also do a bunch more basic filters like that. The result of this operation is an id, which I can use to open the subscription.
Note that subscriptions are persistent and long lived, so you are expected to hold on to that id and make use of it. Using the subscription id, we can open the subscription:
IObservable<User> subscription = store.Subscriptions.Open<User>(id, new SubscriptionConnectionOptions { IgnoreSubscribersErrors = false, BatchOptions = new SubscriptionBatchOptions() { AcknowledgmentTimeout = TimeSpan.FromMinutes(1), MaxDocCount = 1024 * 16, MaxSize = 1024 * 1024 * 4 }, ClientAliveNotificationInterval = TimeSpan.FromSeconds(10), });
You can see that we specify some details about the running connection. In particular, we limit the size of a single batch, and the heart beat intervals. I’ll touch on the error handling a bit later, first, let us see how we actually get the data. Because the subscription is an IObservable<RavenJObject>, that means that you can just utilize reactive extensions and subscribe to the incoming stream as you would expect to. And it means that you’ll continue to get them, even for items that were added after you opened the subscription.
What is interesting is the error handling scenario. Data subscriptions will ensure that you’ll receive each document at least once, but what happens if there is an error? In that case, it depends on your configuration. In the code above, we don’t allow errors, so we you’ll get the same document over and over until you have successfully processed it. If you set IgnoreSubscribersErrors to true, we’ll ignore the errors raised by subscribers.
The nice thing about this is that it works even in the presence of crashes. Under the scenes, once you have successfully processed a document, we’ll send a confirmation to the server about it, so if there is a crash, we know we already processed it. If you crashed midway, we’ll just resend you the relevant document when you open the subscription again.
Comments
This looks like very useful functionality
What if two clients open the same subscription?
Andrew, Then the 2nd client will get an error, you can't have a subscription opened twice.
Ayende, it would be very useful functionality if you could allow more than one client and the data gets load balanced between the two
Andrew, How would figure out which data went to which client, and how would you okay that it was correctly processed? If you want to do that, you can have the clients each open a separate subscription, then ignore stuff that the other one already saw. Alternatively, you can ask the subscription to skip certain docs for one client, and only see those for another.
Ayende, I guess you would need a unique id for each client (guid on connection open or something) and then the server records where it was sent. It would be okay'd in the current way you do now, where you resend after a crash (maybe resending to the other client first?)
I could totally code it myself but if RavenDB supported it then you could use it for background batch queue processing very easily.
For example if you need to look up the state for the entered zip code for each order as it comes in in a background, parallel manner. As your order volume increases then you can increase the number of worker clients to keep up with some required time interval between orders coming in and zip codes being looked up.
Andrew, That would require us to track, per client, each of the documents it was sent, and which document were sent to which client. Right now we can track the latest etag that was sent, and it is much smaller and simpler.
And you can do that by hooking a subscription to a queue or a work distributer. So you pull the data from RavenDB, then send it to a worker, rather than have all workers pull RavenDB.
True, maybe I'm trying to use the wrong tool. It would just allow me to remove the queueing system that I'm using now and have one less tool to go wrong
This will help me a lot! I was doing something like that but if the database supports it out of the box, then it will be my choise!
Just to know, when this feature will be available to the public (in which version)?
It will be in the 3.x next stable release, in about a couple of weeks
Really looking forward to this, any news on stable release?
Neil, We have been running last week's unstable on our production system all week. We'll probably update to the latest unstable for a few days then make a stable release
Andrew with what you're describing what you would want to do is use a single subscriber to pump messages to an Azure Service Bus Topic (or Redis, or etc) and let that handle the message pumping.
Comment preview