Wednesday, April 9, 2014

Parallelising MongoDB Aggregation on a Sharded Database


EDIT 10-Dec-2021: See my far more up to date blog post which supersedes this: Achieving At Least An Order Of Magnitude Aggregation Performance Improvement By Scaling & Parallelism

In my last blog post, I explored how I could speed up a MapReduce-style aggregation job, by parallelising the work over subsets of a collection, using multiple threads, for a non-sharded MongoDB database. In this post, I look at how I took the same test case, and a similar approach, to see if I could achieve a speed up when aggregating the 10 million documents stored in a sharded MongoDB database.

Confession time....

After my success in speeding up an aggregation on a non-sharded database, I assumed this second phase of my investigation would be a walk in the park and I'd quickly march on to a conclusion exhibiting even greater gains, in a sharded environment. However, things didn't quite turn out that way, and I ended up spending considerably more time on this investigation, than I'd anticipated. More on that later.

First of all though, I needed to insert the 10 million randomly generated documents into, a now, sharded collection deployment (3 shards all running on a single Linux laptop), which of course, was straight-forward.


Identifying Data Subsets In A Sharded Environment


Once the sharded collection was populated, I had to consider what changes to make to my JavaScript/mongo-Shell code, to deal with differences between non-sharded and sharded environments. It turned out that this was one of the more straight-forward and intuitive changes to make.

In the sharded MongoDB environment, I didn't have to take an educated guess on how to split up the data into ranges or use the undocumented and time-consuming splitVector command, as I did for the non-sharded tests. MongoDB is already working hard to manage the shard-key for me and how best to evenly distribute subsets (chunks) across each shard. Therefore all the information I needed was already available to me in the 'config' database, accessible via the mongos router.

For the sharded test collection, a quick query of the config database using the mongo Shell (connected to a mongos), told me how the collection was currently distributed.

mongos> use config
mongos> db.chunks.find({ns: "mrtest.rawdata"}, {_id: 0, shard: 1, min: 1, max: 1}).sort({shard: 1})

{ "min" : { "dim0" : 635504 }, "max" : { "dim0" : 702334 }, "shard" : "s0" }
{ "min" : { "dim0" : 702334 }, "max" : { "dim0" : 728124 }, "shard" : "s0" }
{ "min" : { "dim0" : 728124 }, "max" : { "dim0" : 759681 }, "shard" : "s0" }
{ "min" : { "dim0" : 759681 }, "max" : { "dim0" : 827873 }, "shard" : "s0" }
{ "min" : { "dim0" : 827873 }, "max" : { "dim0" : 856938 }, "shard" : "s0" }
{ "min" : { "dim0" : 856938 }, "max" : { "dim0" : 893410 }, "shard" : "s0" }
{ "min" : { "dim0" : 893410 }, "max" : { "dim0" : 942923 }, "shard" : "s0" }
{ "min" : { "dim0" : 942923 }, "max" : { "dim0" : 999996 }, "shard" : "s0" }

{ "min" : { "dim0" : { "$minKey" : 1 } }, "max" : { "dim0" : 42875 }, "shard" : "s1" }
{ "min" : { "dim0" : 42875 }, "max" : { "dim0" : 91602 }, "shard" : "s1" }
{ "min" : { "dim0" : 91602 }, "max" : { "dim0" : 149588 }, "shard" : "s1" }
{ "min" : { "dim0" : 149588 }, "max" : { "dim0" : 197061 }, "shard" : "s1" }
{ "min" : { "dim0" : 197061 }, "max" : { "dim0" : 257214 }, "shard" : "s1" }
{ "min" : { "dim0" : 471994 }, "max" : { "dim0" : 520681 }, "shard" : "s1" }
{ "min" : { "dim0" : 569205 }, "max" : { "dim0" : 602306 }, "shard" : "s1" }

{ "min" : { "dim0" : 257214 }, "max" : { "dim0" : 305727 }, "shard" : "s2" }
{ "min" : { "dim0" : 305727 }, "max" : { "dim0" : 363357 }, "shard" : "s2" }
{ "min" : { "dim0" : 363357 }, "max" : { "dim0" : 412656 }, "shard" : "s2" }
{ "min" : { "dim0" : 412656 }, "max" : { "dim0" : 471994 }, "shard" : "s2" }
{ "min" : { "dim0" : 520681 }, "max" : { "dim0" : 569205 }, "shard" : "s2" }
{ "min" : { "dim0" : 602306 }, "max" : { "dim0" : 635504 }, "shard" : "s2" }
{ "min" : { "dim0" : 999996 }, "max" : { "dim0" : { "$maxKey" : 1 } }, "shard" : "s2" }

                   (I added a newline between the chunks belonging to each shard, for reader clarity; 
                    also the elements highlighted in bold are discussed in the next section of this post)

So I already had all the information available at my finger tips, which defined an 'even-ish' spread of data-subsets (chunks) of the collection. In this case, I could see there were 22 chunks, spread over 3 shards. So I needed to change my JavaScript code to create multiple threads per shard, each targeting a portion of the chunks belonging to a shard. To build this list of shards and their owned chunks, at runtime, in my JavaScript code I used an aggregation to group by shard.

// Query the config DB, grouping data on all the chunks for each shard
db = db.getSiblingDB('config');
var shardsWithChunks = db.chunks.aggregate([
    {$match: {
        "ns": "mrtest.rawdata"
    }},
    {$sort: {
        "min.dim0": 1
    }},
    {$group: {
        _id: "$shard",
        chunks: {$push: {
              min: "$min.dim0",
              max: "$max.dim0"
        }}
    }}
]);

My code then used a loop like below, to cycle through each shard, kicking off multiple sub-aggregations in different threads, where each thread uses a range query that resolves to a portion of chunks currently believed to live on that shard.

shardsWithChunks.forEach(function(shard) {
    // For each new thread, collect together a set of some of the chunks from the
    // shard.chunks array and perform the modified aggregation pipeline on it
});


Decorating a Pipeline to Enable a Thread to Target Just One Shard


The code I'd written to modify a pipeline, to allow a thread to operate on a subset of data in a non-sharded also had to be changed, to allow for subtleties only present when the data is sharded. Taking the example of wanting to spawn 4 threads per shard, for shard 's1' (shown in the output from db.chunks.find() earlier in this post), I had to divide out the 7 chunks it currently held, between the 4 threads. So 3 of the threads would need to target 2 chunks each, and 1 thread would target the remaining 1 chunk.

Notice that in the list of chunks returned by db.chunks.find() earlier, not all chunks on a particular shard are contiguous. For example, in the results marked in bold, I highlighted that one chunk ends at 257214 on shard 's1', followed by a chunk on 's1' starting at 471994. However, the chunk starting at  257214 is on shard 's2'. This means that the code to modify the aggregation pipeline, needs to query the start and end range for each chunk, rather than just a range from the start of the first chunk to the end of the last chunk. Below you can see the naive, incorrect pipeline that my code could generate, on the left, and the correct pipeline that the code should and will generate, on the right.

// BAD
[
   {$match: {
      dim0: {
         $gte: 197061,
         $lt : 520680
      }
   }},
   {$group: {
      _id: $dim0,
      value: {$sum: 1}
   }},
   {$out:
      “mraggout_s1_2"
   }
]
  // GOOD
  [
     {$match: {
        $or: [
           {
              dim0: {
                 $gte: 197061,
                 $lt : 257214
              }
           },
           {
              dim0: {
                 $gte: 471994,
                 $lt : 520681
              }
           }
        ]
     }},
     {$group: {
        _id: "$dim0",
        value: {$sum: 1}
     }},
     {$out:
        "mraggout_s1_2"
     }
  ]


To decorate the original aggregation pipeline in this way, I needed to modify my original aggregate_subset() JavaScript function, to build up an $or expression of ranges, for the $match part prepended to the pipeline. The fully modified JavaScript code I produced, to run in the mongo Shell and perform all these actions can be viewed in the zip file available here.

It is worth mentioning that, if during the aggregation run, the cluster happens to migrate any chunks, it would not be a big deal. The modified aggregation pipelines would still function correctly and return the correct results. However, a couple of the threads would just take longer to complete, as they would end up being executed against two shards, not one, by mongos and the query optimiser, to collect the requested data.


Sub-optimal mongos Shard Routing For Some Edge-case Range Queries


One thing I noticed when running explain (eg. using db.coll.aggregate(pipeline, {explain: true})), was that even when running the modified pipeline, shown above, which should be targeting chunks in only one shard, mongos actually chooses to target two shards (both 's1' and 's2'). The correct result is still generated, but it just takes a little longer, as twice the amount of shards are hit than necessary. Essentially the query optimiser is not correctly identifying the range boundary as belonging to just one shard, when the range boundary falls exactly at the intersection of two chunks that happen to reside on different shards. Specifically, the problem is for a sharded collection that has a single (non-compound) shard key, and $lte or $gte is used in the range query. I created a JIRA ticket logging this bug, but with a low priority, as functionally an accurate result is returned, and also, it is possible to use a workaround. My original section of code and the modified version, to use this workaround, is shown below.

// mongos/optimiser targets 2 shards
{
   dim0: {
      $gte: 197061,
      $lt : 257214
   }
}
  // mongos/optimiser targets 1 shard
  {
     dim0: {
        $gte: 197061,
        $lte: 257213
     }
  }

By modifying the range criteria to be '$lte: 257213', in my JavaScript code, only one shard ('s1') is then correctly routed to. However, this does mean that any generic query code, in a more 'real-world' application, has to be explicitly aware of field types (no explicit database schema) and the knowledge that the field actually contains an integer that can possibly be decremented by 1, to help encourage optimal sharded query routing.


Initial Results


When generating my aggregation test results and capturing the execution time, I first wanted to establish the performance of the normal unchanged aggregation on the sharded database. I felt it would be interesting to see if a regular, unchanged aggregation, ran faster once sharded. When I ran the unchanged aggregation against the sharded database, I was a little surprised to see it execute in 27 seconds. Compare this with the execution time of just 23 seconds when I ran the aggregation against the non-sharded collection. At first I was surprised that the aggregation took longer to complete. Later, upon reflection, I came to realise why, and I will discuss these reasons later, in the conclusion of the blog post. However, for the moment my challenge was just to try to beat the time of 27 seconds, with an my 'optimised parallel aggregations' approach.

I then ran my newly modified JavaScript code (see code in zip file), executing parallel aggregation threads targeted at groups of chunks on each shard (4 threads per shard).  This test executed in 20 seconds. Again I was a little disappointed because this yielded a roughly 25% speed-up in the sharded environment, whereas before, in a non-sharded environment, I was achieving over a 40% speed-up.


Attempts on Improvement


At this point, I started to hypothesise on potential reasons why my 'optimisations' couldn't speed-up an aggregation as quickly for a sharded database. Below are some of the main hypotheses I came up with and what happened when I tested each one.
  1. Is running the test via JavaScript in the mongo Shell introducing a bottleneck?  I re-wrote my application code, to decorate the pipeline and run separate aggregation threads, in Python instead, using the PyMongo driver (also included in the code zip file). However, there was no noticeable change in performance, when re-running the test with the Python code. 
  2. Is the single mongos router, used by all 12 aggregation threads, acting as a bottleneck?  So I started up 12 separate mongos instances and modified the Python version of the code, to force each of the spawned 12 threads to connect to its own dedicated mongos. The execution time that resulted was only marginally better, and so I concluded that a single shared mongos was not really a major bottleneck
  3. Is the role of mongos (ie. shard routing/coordination and partial aggregation processing) the main cause of added latency?  So at this point I went a bit off-piste and did something no-one should never do. I modified the Python code again, but this time to make each spawned thread connect directly to the mongod of the shard, rather than going via mongos. My code achieved this by first querying the 'config' database to find out the direct URL for each shard's mongod host. This time I did see a significant decrease in execution time, down to 15 seconds. This is only 2 seconds off the 13 seconds achieved on the non-sharded database. The 2 second different can probably be explained by the fact that I am spawning 12 threads on a now overloaded host machine, running 3 mongod shards, 1 config mongod database and 1 mongos, versus the non-sharded test that runs just 4 threads against a single mongod. However, running the aggregations directly against shards is a bad thing to do, for two reasons: (1) The $match may miss a chunk, read an orphan chunk or read only part of a split chunk, if any chunk splitting/migration has occurred during the process, resulting in an incorrect result. (2) the aggregation $out operator will cause a collection to be written to the local shard database, but the config databases and all the mongos's will have no awareness of its existence (indeed, I witnessed a 'confused' system after trying this). Therefore, I have not included the code modifications in the code zip file, that I made to pull this stunt, and I vow never to try such a thing again! Still, the results were interesting, even if unusable.  :-)

It is probably worth noting that, for all these test variations, I consistently observed the following characteristics:
  • top would show all the host CPUs running at around 99% for approximately the first 3/4 of the test run.
  • mongostat would show zero page faults (inference being that all the working-set is memory-resident) and only single-figure locked database percentages (ie. sub 10%).
  • iostat would show, at its peak, less than 15% disk usage for the host SSD.


Results Summary


So finally my prolonged theorising and testing came to an end. The table below shows the results I achieved for the different variations, ultimately demonstrating a ~30% performance speed up (27 seconds down to 19 seconds), when using my parallel sub-aggregations optimisation via multiple mongos's against 3 shards. This was less than the 40+% performance increase I achieved in my non-sharded database tests.


Aggregation Description Data-Subsets per Shard Spawned Threads per Shard Number of mongos's Execution Time
Regular unchanged (single-threaded) 0 0 1 27 secs
Serial sub-aggregations (single-threaded) 2 0 1 32 secs
Serial sub-aggregations (single-threaded) 4 0 1 32 secs
Parallel sub-aggregations, single mongos 2 2 1 21 secs
Parallel sub-aggregations, single mongos 4 4 1 20 secs
Parallel sub-aggregations, multiple mongos's 2 2 6 20 secs
Parallel sub-aggregations, multiple mongos's 4 4 12 19 secs
Parallel sub-aggregations, direct to mongod's 2 2 0 16 secs
Parallel sub-aggregations, direct to mongod's 4 4 0 15 secs


For interest, I've included in the table, the results from running the parallel aggregations directly against shard mongod's (red for 'danger'!). However, as discussed earlier, doing this for real in a production system will result in bad things happening. You have been warned!


Conclusions


I've demonstrated that the execution time for certain MapReduce-style Aggregation workloads on a sharded database can be reduced, by parallelising the aggregation work over collection subsets, using multiple threads. However, for this particular test case, the speed-up is less pronounced, when compared to the same test on a non-sharded database.

On reflection, it is probably naive to expect a reduction in aggregation execution time, when moving a collection from a non-sharded database to a sharded one. If you think about the main benefits of sharding, sharding is NOT really about reducing individual operation latency. The main purpose of sharding is to enable a database to be able to scale-out to cope with a significant increase in data volumes and/or a significant increase in read and/or write throughput demand. To that effect, I would expect a sharded database, spread out over multiple machines, to be able to sustain a much higher throughput of concurrent aggregation operations, compared with a non-sharded database confined to a single machine (perhaps that can be a test and blog post for another day!).

Lastly, I also do suspect, with a combination of a much larger data-set, and with the shards, mongos's and client app spread out over separate dedicated host machines, the aggregation test on a sharded database may achieve equality or even beat the execution time compared with a non-sharded database. I even feel this would be the case for an unchanged, non-optimised aggregation pipeline, due to MongoDB's built-in ability to run the first part of a pipeline directly on each shard, in parallel. Unfortunately, time has run out for me to try this at the moment, so perhaps it's something for me to revisit in the future.


Song for today: Perth by Bon Iver