Improving Map/Reduce performance in RavenDB

time to read 12 min | 2323 words

This is another stream of conciseness post, with me trying to figure out what is the best way to resolve a given problem.

Update: I ended up solving this in a drastically different way. I'll post about this later on. I am keeping this here because it is a good post about how I think about a problem.

A while ago, I posted a visual description of how Map/Reduce is supposed to work. That was while I was working on RavenDB map/reduce implementation, but that isn’t actually how the current RavenDB map/reduce works.

Instead, it works like this:

Map phase:

for item in docs:
   for result in reduce(map(item)):
        Persist(item.id, result.key, result)

And the reduce phase:

for result in reduce(results):
     WriteToIndex(result)

There are a few things that are interesting here.  First, we have both map and reduce run during the map phase, why is that?

Well, to do an immediate reduction of the values, of course. This has two major benefits.

  • It means that in the reduce phase, the reduce accepts the output of the reduce – this is important because it prepare us for the next step that we would have to do, multiple reduce steps.
  • Second, it means that if you have multiple results from the same documents that share the same key, they would be reduced immediately, rather than have multiple results with the same key.

You might notice an issue with the code above, though. It seems like we are only running reduce once, and only once.

Indeed, this is how RavenDB 1.0 behaves. It only run the reduce once, regardless of how many entries you have per key.

Wait! I probably need to explain things. Let us talk for a second about the following map/reduce index:

//map
from order in docs.Orders
from line in order.OrderLines
select new
{
   line.Product,
   line.Qty
}

//reduce
from result in results
group result by result.Product into g
select new
{
    Product = g.Key,
    Qty = g.Sum(x=>x.Qty)
}

Now, if we have an order with two line items for the same product, they would be immediately reduced (on the same document) and saved as the map results.

Now, the reduce key in this case is the line item product. So when we execute the reduce, we load all the map results that share the same reduce key and run them through the reduce function together.

As I said, this is how RavenDB 1.0 behaves. And it works really nicely, except that it behave less nicely if you have a lot of results for the same reduce key. What happen if we had a really popular product, that was purchased by a million different order?

Every time that we would get a new order for this product, we would have to re-reduce the entire set. That means that we would have to re-reduce 1 millions items.

We recently got a problem when one of our customers run into an issue with running map/reduce indexes over the entire US census data. One of the problematic indexes was something like:

//map 
from person in docs.CensusPeople
select new
{
  person.State,
  Count = 1
}

//reduce
from result in results
group result by result.State into g
select new
{
  State = g.Key,
  Count = g.Sum(x=>x.Count)
}

As you can imagine, this sort of thing is going to have a lot of items for the same key.

For example, for California, we would need to run reduce over about 38 million items, and for Texas it would be over 25 million items. This is really not what we had in mind, so we turned to a long standing bug in RavenDB and started to implement multi step reduce.

The issue is how to do so. Ideally, we do not want to have to make any changes between map/reduce indexes that have a large number of items per key and map/reduce indexes that have small number of indexes per key.

The question is how to do this, and how to make sure that we don’t affect the overall system performance. As I mentioned before, it is very easy to modify things to fit one particular scenario, while forgetting all about the other scenarios.

Things get interesting after that, and here is where you might get lost, because this part is mostly written from the point of view of the implementers.

The actual behavior of the system in the map phase is more complex, because we need to invalidate old items, it looks more like this:

for item in docs:
   keys = DeleteMapResultsFor(item.id)
   for result in reduce(map(item)):
           keys.Remove(result.key)
        Persist(item.id, result.key, result)
   ReReduceRemovedKeys(keys)

Instead of this, we will have this:

for item in docs:
   result = DeleteMapResultsFor(item.id)
   keys = new HashSet<string>(result.Select(x=>x.Key))
   lookups = result.ToLookup(x=>new {x.id, x.key})

   for result in reduce(map(item)):
           keys.Remove(result.key)
           int bucketId
           if not lookups.TryFind(new { item.Id, result.key}, out bucketId):
               bucketId = -1
        Persist(item.id, result.key, bucketId, result)
   ReReduceRemovedKeys(keys)

Note that we now have the notion of a bucket, and by default that bucket is set to –1. But note that we keep the same bucketId if it already has one, this will be important later on.

The interesting thing happens in the reduce phase:

def Reduce(string key, int level):
    bool hasMore =  true
    bool hadMore = false
    while hasMore:
        results = GetMappedResults(key, level, out hasMore)
        hadMore |= hasMore
        if hasMore:
            newBucketId = GetNewBucketId(key, level)
            UpdateBucketId(results, newBucketId)
        for result in reduce(results):
                Persist(key, level +1, result)
                if not hadMore:
                    WriteToIndex(key, result)
    if hadMore:
        ScheduleReduce(key, level +1)

Here is where the important things happen. If we have less than 1,024 items for the same key, we just proceed normally, there is nothing to see there.

If we have more than that, then we create a bucket for all of those results and schedule a re-reduce for the next level up.

In other words, it looks like this, here it the map phase, notice that we start out with all of the bucket ids being –1.

image

When running the reduce phase with level = 0, we get three buckets, like this:

 

image

Which we now need to re-reduce again, this is where we are called with level = 1. Let us assume that the results of bucket 1 & 2 are over 1,024 still, so we will have:

 

image

And finally:

image

 

So far, this looks nice, but there are several practical problems that we still need to solve.

To start with, when does this end? We have users who write map/reduce queries like this:

//map #1
from customer in docs.Customers
select new 
{
   CustomerId = customer.Id,
   CustomerName = customer.Name,
   OrderId = (string)null,
}

// map #2
from order in docs.Orders
select new
{
  order.CustomerId,
  CustomerName = (string)null,
  OrderId = order.Id
}

//reduce
from result in results
group result by result.CustomerId into g
let name = g.FirstOrDefault(x=>x.CustomerName!=null).CustomerName
from item in g
select new
{
   CustomerId = g.Key,
   CustomerName = name,
   item.OrderId
}

This is a frowned upon (but working) way of allow you to query and sort by the customer name while searching for indexes. The problem with this method is that if we have 15,000 orders per customer, we are going to have the same number come out of the reduce phase as well.

Now, the reason this is frowned upon? Because while this is using map/reduce, it isn’t actually… you know.. reducing the data. In order to resolve this issue, we are going to make sure that all of the items generated from a single reduce step will always go into the same bucket. This will mean that we keep pretty much the same behavior as we have now, it is going to be inefficient, but that was always going to be the case.

We are also going to limit the number of levels to three, which still gives us the ability to handle over a billion results before a reduce phase would need to see more than 1,024 items at once.

Take the California example, we would have 37,691,912 people, each of them reduce to 37,691,912 map results at bucket –1. Then we have 36,809 buckets for the second level. And finally 35 levels at the third level. All of which are computed for the final result.

The next step from here is to actually handle updates, which means that we have to keep track of the bucket ids going forward, so we start with deleting a person, which means that we need to delete their map result. Which means that we need to re-reduce the bucket they belong to at the appropriate level, and then upward, etc. In total, we would have to compute 1,024 + 1,024 + 35 items, instead of 37,691,912.

Okay, enough talking, let us see if I straighten things out enough for me to actually be able to implement this.