Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,161
Privacy Policy · Terms
filter by tags archive
time to read 4 min | 787 words

In my previous post, we have increased the capacity of the cluster by moving all new work to the new set of servers. In this post, I want to deal with a slightly harder problem, how to handle it when it isn’t new data that is causing the issue, but existing data. So we can’t just throw a new server, but need to actually move data between nodes.

We started with the following configuration:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
    {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
};

And what we want is to add another server for EU and NA. Our new topology would be:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU1", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe1"}},
    {"NA1", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica1"}},
    {"EU2", new DocumentStore {Url = "http://rvn4:8080", DefaultDatabase = "Europe2"}},
    {"NA2", new DocumentStore {Url = "http://rvn5:8080", DefaultDatabase = "NorthAmerica2"}},
};

There are a couple of things that we need to pay attention to. First, we no longer use the EU / NA shard keys, they have been removed in favor of EU1 & EU2 / NA1 & NA2. We’ll also change the sharding configuration so it would split the new data between the two new nodes for each region evenly (see previous post for the details on exactly how this is done). But what about the existing data? We need to have some way of actually moving the data. That is when our ops tools come into play.

We use the smuggler to move the data between the servers:

Raven.Smuggler.exe  between http://rvn2:8080 http://rvn2:8080 --database=Europe --database2=Europe1 --transform-file=transform-1.js --incremental
Raven.Smuggler.exe  between http://rvn2:8080 http://rvn4:8080 --database=Europe --database2=Europe2 --transform-file=transform-2.js --incremental
Raven.Smuggler.exe  between http://rvn3:8080 http://rvn3:8080 --database=NorthAmerica --database2=NorthAmerica1 --transform-file=transform-1.js --incremental
Raven.Smuggler.exe  between http://rvn3:8080 http://rvn5:8080 --database=NorthAmerica --database2=NorthAmerica2 --transform-file=transform-2.js --incremental

The commands are pretty similar, with just the different options, so let us try to figure out what is going on. We are asking the smuggler to move the data between two databases in an incremental fashion, while applying a transform script. The transform-1.js file looks like this:

function(doc) { 
    var id = doc['@metadata']['@id']; 
    var node = (parseInt(id.substring(id.lastIndexOf('/')+1)) % 2);

    if(node == 1)
        return null;

    doc["@metadata"]["Raven-Shard-Id"] = doc["@metadata"]["Raven-Shard-Id"] + (node+1);

    return doc;
}

And the tranasform-2.js is exactly the same except that it return early if node is 0. In this way, we are able to split the data into the two new servers.

Note that the reason we use an incremental approach means that we can do this, even if it takes a long while, then the window of time when we switch is very narrow, and require us to only pass the recently changed data.

That still leaves the question of how are we going to deal with old ids. We are still going to have things like “EU/customers/###” in the database, even if those documents are on one of the two new nodes. We handle this, like most low level sharding behaviors, by customizing the sharding strategy. In this case, we modify the PotentialsServersFor(…) method:

public override IList<string> PotentialShardsFor(ShardRequestData requestData)
{
    var potentialShardsFor = base.PotentialShardsFor(requestData);
    if (potentialShardsFor.Contains("EU"))
    {
        potentialShardsFor.Remove("EU");
        potentialShardsFor.Add("EU1");
        potentialShardsFor.Add("EU2");
    }
    if (potentialShardsFor.Contains("NA"))
    {
        potentialShardsFor.Remove("NA");
        potentialShardsFor.Add("NA1");
        potentialShardsFor.Add("NA2");
    }
    return potentialShardsFor;
}

In this case, we are doing a very simple thing, when the default shard resolution strategy detect that we want to go to the old EU node, we’ll tell it to go to both EU1 and EU2. A more comprehensive solution would narrow it down to the exact server, but that depend on how exactly you split the data, and is left as an exercise for the reader.

time to read 6 min | 1145 words

Continuing on the theme of giving a full answer to interesting questions on the mailing list in the blog, we have the following issue. 

We have a sharded cluster, and we want to add a new node to the cluster, how do we do it? I’ll discuss a few ways in which you can handle this scenario. But first, let us lay out the actual scenario.

We’ll use the Customers & Invoices system, and we put the data in the various shard along the following scheme:

Customers Sharded by region
Invoices Sharded by customer
Users Shared database (not sharded)

We can configure this using the following:

var shards = new Dictionary<string, IDocumentStore>
{
    {"Shared", new DocumentStore {Url ="http://rvn1:8080", DefaultDatabase = "Shared"}},
    {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
    {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
};
ShardStrategy shardStrategy = new ShardStrategy(shards)
    .ShardingOn<Company>(company =>company.Region, region =>
    {
        switch (region)
        {
            case "USA":
            case "Canada":
                return "NA";
            case "UK":
            case "France":
                return "EU";
            default:
                return "Shared";
        }
    })
    .ShardingOn<Invoice>(invoice => invoice.CompanyId)
    .ShardingOn<User>(user=> "Shared");
 

So far, so good. Now, we have so much work that we can’t just have two servers for customers & invoices, we need more. We change the sharding configuration to include 2 new servers, and we get:

var shards = new Dictionary<string, IDocumentStore>
     {
         {"Shared", new DocumentStore {Url = "http://rvn1:8080", DefaultDatabase = "Shared"}},
         {"EU", new DocumentStore {Url = "http://rvn2:8080", DefaultDatabase = "Europe"}},
         {"NA", new DocumentStore {Url = "http://rvn3:8080", DefaultDatabase = "NorthAmerica"}},
        {"EU2", new DocumentStore {Url = "http://rvn4:8080", DefaultDatabase = "Europe-2"}},
        {"NA2", new DocumentStore {Url = "http://rvn5:8080", DefaultDatabase = "NorthAmerica-2"}},
     };

    var shardStrategy = new ShardStrategy(shards);
    shardStrategy.ShardResolutionStrategy = new NewServerBiasedShardResolutionStrategy(shards.Keys, shardStrategy)
        .ShardingOn<Company>(company => company.Region, region =>
        {
            switch (region)
            {
                case "USA":
                case "Canada":
                    return "NA";
                case "UK":
                case "France":
                    return "EU";
                default:
                    return "Shared";
            }
        })
        .ShardingOn<Invoice>(invoice => invoice.CompanyId)
        .ShardingOn<User>(user => user.Id, userId => "Shared");

Note that we have a new shard resolution strategy, what is that? This is how we control lower level details of the sharding behavior, in this case, we want to control where we’ll write new documents.

public class NewServerBiasedShardResolutionStrategy : DefaultShardResolutionStrategy
{
    public NewServerBiasedShardResolutionStrategy(IEnumerable<string> shardIds, ShardStrategy shardStrategy)
        : base(shardIds, shardStrategy)
    {
    }

    public override string GenerateShardIdFor(object entity, ITransactionalDocumentSession sessionMetadata)
    {
        var generatedShardId = base.GenerateShardIdFor(entity, sessionMetadata);
        if (entity is Company)
        {
            if (DateTime.Today < new DateTime(2015, 8, 1) ||
                DateTime.Now.Ticks % 3 != 0)
            {
                return generatedShardId + "2";
            }
            return generatedShardId;
        }
        return generatedShardId;
    }
}    

What is the logic? If we have a new company, we’ll call the GenerateShardIdFor(entity) method, and for the next 3 months, we’ll create all new companies (and as a result, their invoices) in the new servers. After the 3 months have passed, we’ll still generate the new companies on the new servers at a rate of two thirds on the new servers vs. one third on the old servers.

Note that in this case, we didn’t have to modify any data whatsoever. And over time, the data will balance itself out between the servers. In my next post, we’ll deal with how we actually split an existing shard into multiple shards.

time to read 7 min | 1265 words

A question came up in the mailing list, how do we enable sharding for an existing database. I’ll deal with data migration in this scenario at a later post.

The scenario is that we have a very successful application, and we start to feel the need to move the data to multiple shards. Currently all the data is sitting in the RVN1 server. We want to add RVN2 and RVN3 to the mix. For this post, we’ll assume that we have the notion of Customers and Invoices.

Previously, we access the database using a simple document store:

var documentStore = new DocumentStore
{
	Url = "http://RVN1:8080",
	DefaultDatabase = "Shop"
};

Now, we want to move to a sharded environment, so we want to write it like this. Existing data is going to stay where it is at, and new data will be sharded according to geographical location.

var shards = new Dictionary<string, IDocumentStore>
{
	{"Origin", new DocumentStore {Url = "http://RVN1:8080", DefaultDatabase = "Shop"}},//existing data
	{"ME", new DocumentStore {Url = "http://RVN2:8080", DefaultDatabase = "Shop_ME"}},
	{"US", new DocumentStore {Url = "http://RVN3:8080", DefaultDatabase = "Shop_US"}},
};

var shardStrategy = new ShardStrategy(shards)
	.ShardingOn<Customer>(c => c.Region)
	.ShardingOn<Invoice> (i => i.Customer);

var documentStore = new ShardedDocumentStore(shardStrategy).Initialize();

This wouldn’t actually work. We are going to have to do a bit more. To start with, what happens when we don’t have a 1:1 match between region and shard? That is when the translator become relevant:

.ShardingOn<Customer>(c => c.Region, region =>
{
    switch (region)
    {
        case "Middle East":
            return "ME";
        case "USA":
        case "United States":
        case "US":
            return "US";
        default:
            return "Origin";
    }
})

We basically say that we map several values into a single region. But that isn’t enough. Newly saved documents are going to have the shard prefix, so saving a new customer and invoice in the US shard will show up as:

image

But existing data doesn’t have this (created without sharding).

image

So we need to take some extra effort to let RavenDB know about them. We do this using the following two functions:

 Func<string, string> potentialShardToShardId = val =>
 {
     var start = val.IndexOf('/');
     if (start == -1)
         return val;
     var potentialShardId = val.Substring(0, start);
     if (shards.ContainsKey(potentialShardId))
         return potentialShardId;
     // this is probably an old id, let us use it.
     return "Origin";

 };
 Func<string, string> regionToShardId = region =>
 {
     switch (region)
     {
         case "Middle East":
             return "ME";
         case "USA":
         case "United States":
         case "US":
             return "US";
         default:
             return "Origin";
     }
 };

We can then register our sharding configuration so:

  var shardStrategy = new ShardStrategy(shards)
      .ShardingOn<Customer, string>(c => c.Region, potentialShardToShardId, regionToShardId)
      .ShardingOn<Invoice, string>(x => x.Customer, potentialShardToShardId, regionToShardId); 

That takes care of handling both new and old ids, and let RavenDB understand how to query things in an optimal fashion. For example, a query on all invoices for ‘customers/1’ will only hit the RVN1 server.

However, we aren’t done yet. New customers that don’t belong to the Middle East or USA will still go to the old server, and we don’t want any modification to the id there. We can tell RavenDB how to handle it like so:

var defaultModifyDocumentId = shardStrategy.ModifyDocumentId;
shardStrategy.ModifyDocumentId = (convention, shardId, documentId) =>
{
    if(shardId == "Origin")
        return documentId;

    return defaultModifyDocumentId(convention, shardId, documentId);
};

That is almost the end. There is one final issue that we need to deal with, and that is the old documents, before we used sharding, don’t have the required sharding metadata. We can fix that using a store listener. So we have:

 var documentStore = new ShardedDocumentStore(shardStrategy);
 documentStore.RegisterListener(new AddShardIdToMetadataStoreListener());
 documentStore.Initialize();

Where the listener looks like this:

 public class AddShardIdToMetadataStoreListener : IDocumentStoreListener
 {
     public bool BeforeStore(string key, object entityInstance, RavenJObject metadata, RavenJObject original)
     {
         if (metadata.ContainsKey(Constants.RavenShardId) == false)
         {
             metadata[Constants.RavenShardId] = "Origin";// the default shard id
         }
         return false;
     }

     public void AfterStore(string key, object entityInstance, RavenJObject metadata)
     {
     }
 }

And that is it. I know that there seems to be quite a lot going on in here, but it basically can be broken down to three actions that we take:

  • Modify the existing metadata to add the sharding server id via the listener.
  • Modify the document id convention so documents on the old server won’t have a designation (optional).
  • Modify the sharding configuration so we’ll understand that documents without a shard prefix actually belong to the Origin shard.

And that is pretty much it.

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 4 days from now
  3. What happens when a sparse file allocation fails? - 6 days from now
  4. NTFS has an emergency stash of disk space - 8 days from now
  5. Challenge: Giving file system developer ulcer - 11 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}