Improving Map/Reduce performance in RavenDB
This is another stream of conciseness post, with me trying to figure out what is the best way to resolve a given problem.
Update: I ended up solving this in a drastically different way. I'll post about this later on. I am keeping this here because it is a good post about how I think about a problem.
A while ago, I posted a visual description of how Map/Reduce is supposed to work. That was while I was working on RavenDB map/reduce implementation, but that isn’t actually how the current RavenDB map/reduce works.
Instead, it works like this:
Map phase:
for item in docs: for result in reduce(map(item)): Persist(item.id, result.key, result)
And the reduce phase:
for result in reduce(results): WriteToIndex(result)
There are a few things that are interesting here. First, we have both map and reduce run during the map phase, why is that?
Well, to do an immediate reduction of the values, of course. This has two major benefits.
- It means that in the reduce phase, the reduce accepts the output of the reduce – this is important because it prepare us for the next step that we would have to do, multiple reduce steps.
- Second, it means that if you have multiple results from the same documents that share the same key, they would be reduced immediately, rather than have multiple results with the same key.
You might notice an issue with the code above, though. It seems like we are only running reduce once, and only once.
Indeed, this is how RavenDB 1.0 behaves. It only run the reduce once, regardless of how many entries you have per key.
Wait! I probably need to explain things. Let us talk for a second about the following map/reduce index:
//map from order in docs.Orders from line in order.OrderLines select new { line.Product, line.Qty } //reduce from result in results group result by result.Product into g select new { Product = g.Key, Qty = g.Sum(x=>x.Qty) }
Now, if we have an order with two line items for the same product, they would be immediately reduced (on the same document) and saved as the map results.
Now, the reduce key in this case is the line item product. So when we execute the reduce, we load all the map results that share the same reduce key and run them through the reduce function together.
As I said, this is how RavenDB 1.0 behaves. And it works really nicely, except that it behave less nicely if you have a lot of results for the same reduce key. What happen if we had a really popular product, that was purchased by a million different order?
Every time that we would get a new order for this product, we would have to re-reduce the entire set. That means that we would have to re-reduce 1 millions items.
We recently got a problem when one of our customers run into an issue with running map/reduce indexes over the entire US census data. One of the problematic indexes was something like:
//map from person in docs.CensusPeople select new { person.State, Count = 1 } //reduce from result in results group result by result.State into g select new { State = g.Key, Count = g.Sum(x=>x.Count) }
As you can imagine, this sort of thing is going to have a lot of items for the same key.
For example, for California, we would need to run reduce over about 38 million items, and for Texas it would be over 25 million items. This is really not what we had in mind, so we turned to a long standing bug in RavenDB and started to implement multi step reduce.
The issue is how to do so. Ideally, we do not want to have to make any changes between map/reduce indexes that have a large number of items per key and map/reduce indexes that have small number of indexes per key.
The question is how to do this, and how to make sure that we don’t affect the overall system performance. As I mentioned before, it is very easy to modify things to fit one particular scenario, while forgetting all about the other scenarios.
Things get interesting after that, and here is where you might get lost, because this part is mostly written from the point of view of the implementers.
The actual behavior of the system in the map phase is more complex, because we need to invalidate old items, it looks more like this:
for item in docs: keys = DeleteMapResultsFor(item.id) for result in reduce(map(item)): keys.Remove(result.key) Persist(item.id, result.key, result) ReReduceRemovedKeys(keys)
Instead of this, we will have this:
for item in docs: result = DeleteMapResultsFor(item.id) keys = new HashSet<string>(result.Select(x=>x.Key)) lookups = result.ToLookup(x=>new {x.id, x.key}) for result in reduce(map(item)): keys.Remove(result.key) int bucketId if not lookups.TryFind(new { item.Id, result.key}, out bucketId): bucketId = -1 Persist(item.id, result.key, bucketId, result) ReReduceRemovedKeys(keys)
Note that we now have the notion of a bucket, and by default that bucket is set to –1. But note that we keep the same bucketId if it already has one, this will be important later on.
The interesting thing happens in the reduce phase:
def Reduce(string key, int level): bool hasMore = true bool hadMore = false while hasMore: results = GetMappedResults(key, level, out hasMore) hadMore |= hasMore if hasMore: newBucketId = GetNewBucketId(key, level) UpdateBucketId(results, newBucketId) for result in reduce(results): Persist(key, level +1, result) if not hadMore: WriteToIndex(key, result) if hadMore: ScheduleReduce(key, level +1)
Here is where the important things happen. If we have less than 1,024 items for the same key, we just proceed normally, there is nothing to see there.
If we have more than that, then we create a bucket for all of those results and schedule a re-reduce for the next level up.
In other words, it looks like this, here it the map phase, notice that we start out with all of the bucket ids being –1.
When running the reduce phase with level = 0, we get three buckets, like this:
Which we now need to re-reduce again, this is where we are called with level = 1. Let us assume that the results of bucket 1 & 2 are over 1,024 still, so we will have:
And finally:
So far, this looks nice, but there are several practical problems that we still need to solve.
To start with, when does this end? We have users who write map/reduce queries like this:
//map #1 from customer in docs.Customers select new { CustomerId = customer.Id, CustomerName = customer.Name, OrderId = (string)null, } // map #2 from order in docs.Orders select new { order.CustomerId, CustomerName = (string)null, OrderId = order.Id } //reduce from result in results group result by result.CustomerId into g let name = g.FirstOrDefault(x=>x.CustomerName!=null).CustomerName from item in g select new { CustomerId = g.Key, CustomerName = name, item.OrderId }
This is a frowned upon (but working) way of allow you to query and sort by the customer name while searching for indexes. The problem with this method is that if we have 15,000 orders per customer, we are going to have the same number come out of the reduce phase as well.
Now, the reason this is frowned upon? Because while this is using map/reduce, it isn’t actually… you know.. reducing the data. In order to resolve this issue, we are going to make sure that all of the items generated from a single reduce step will always go into the same bucket. This will mean that we keep pretty much the same behavior as we have now, it is going to be inefficient, but that was always going to be the case.
We are also going to limit the number of levels to three, which still gives us the ability to handle over a billion results before a reduce phase would need to see more than 1,024 items at once.
Take the California example, we would have 37,691,912 people, each of them reduce to 37,691,912 map results at bucket –1. Then we have 36,809 buckets for the second level. And finally 35 levels at the third level. All of which are computed for the final result.
The next step from here is to actually handle updates, which means that we have to keep track of the bucket ids going forward, so we start with deleting a person, which means that we need to delete their map result. Which means that we need to re-reduce the bucket they belong to at the appropriate level, and then upward, etc. In total, we would have to compute 1,024 + 1,024 + 35 items, instead of 37,691,912.
Okay, enough talking, let us see if I straighten things out enough for me to actually be able to implement this.
Comments
Re: updates, did you see how CouchDB uses B-tree indexes to persist their intermediate reduce results? http://guide.couchdb.org/draft/views.html#reduce ?
So basically this is persisting the intermediate steps of the map/reduce step? For the california example there are 37 million input elements scattered thoughout the census data which has around 300 million entries (assuming most people filled out a census). As this is processed the reduce phase should have separated these out into individual pieces with intermediate sums. Taken from your original post in 2010 one of the batches might have 15 million, another 100K, etc. depending on which states happened to be sent to whichever batch.
So is the concept of a bucket then to be seen as the equivalent of one of the batches created in map/reduce (before the final batch when all the keys are unique and reducing is completed)?
Or is it that a bucket will hold for specific keys, so bucket 1 may contain all the california data and bucket 2 contain texas for example?
I've read this post several times over and I can't wrap my head around what is going on at all.
avolkov, This is really interesting, but not very useful for our needs. The way CouchDB and us store the results internally is quite different.
ppatterson, Yes, we are persisting the intermediate results. We use the term buckets to split the data into multiple sections within a single key. Batch & bucket are probably the same thing. Bucket is local to a key. We have 100 buckets for CA at level 1, for example, each containing some data. Then we have 10 buckets for CA at level 2, each containing the reduced data from level 1. Then we have the final result for CA.
Comment preview