EDIT 10-Dec-2021: See my far more up to date blog post which supersedes this: Achieving At Least An Order Of Magnitude Aggregation Performance Improvement By Scaling & Parallelism
A little while ago, Antoine Girbal wrote a great blog post describing how to speed up a MonogDB MapReduce job by 20x. Now that MongoDB version 2.6 is nearly upon us, and prompted by an idea from one of my very smart colleagues, John Page, I thought I'd investigate whether MongoDB Aggregation jobs can be sped up using a similar approach. Here, I will summarise the findings from my investigation.
To re-cap, the original MapReduce tests looked for all the unique values in a collection, counting their occurrences. The following improvements were incrementally applied by Antoine against the 10 million documents in the collection:
With all these optimisation in place, Antoine was able to reduce a 1200 second MapReduce job to just 60 seconds! Very impressive.
I expect that one of the reasons Antoine was focussing on MapReduce in MongoDB, was because MongoDB limited the output size of the faster Aggregation framework to 16 MB. With MongoDB 2.6, this threshold is removed. Aggregations generate multi-document results, rather than a single document and these can be navigated by the client applications using a cursor. Alternatively, an output collection can be specified for the Aggregation framework to write directly to. Thus, version 2.6 presents the opportunity to try the MapReduce test scenario with the Aggregation framework instead, to compare performance. In this blog post I do just that, focussing purely on testing a single, non-sharded database.
Before I tried running the test with the Aggregation framework, I thought I'd quickly run and time Antoine's optimised MapReduce job on my test machine to normalise any performance difference that occurs due to hardware disparity (for reference, my host machine is a Linux x86-64 laptop with SSD, 2 cores + hyper-threading, running MongoDB 2.6.0rc1).
I first had to insert the 10 million documents, as Antoine did, using the mongo Shell, with each insert containing a single field (‘dim0’) whose value is a random number between 0 and 1 million:
> for (var i = 0; i < 10000000; ++i) {
db.rawdata.insert({dim0: Math.floor(Math.random()*1000000)});
}
> db.rawdata.ensureIndex({dim0: 1})
I then ran Antoine's fully optimised MapReduce job, which completed in 32 seconds, versus 63 seconds on Antoine's host machine.
So the first thing I needed to establish is how much faster or slower the equivalent Aggregation job is, compared with the 'optimised' MapReduce job I'd just run on the same hardware. The Aggregation pipeline is actually very simple to implement for this test. Below you can see the MapReduce code and the Aggregation pipeline equivalent:
The pipeline basically just groups on the field 'dim0', counting the number of occurrences for this value. So I ran this Aggregation against my database, called 'mrtest', containing the random generated 'rawdata' collection:
> db = db.getSiblingDB('mrtest');
> db['rawdata'].aggregate(pipeline);
The aggregation completed in 23 seconds, versus 32 seconds for running the 'optimised' equivalent MapReduce job on the same hardware. This shows just how quick the standard out-of-the-box Aggregation capability is in MongoDB!
So my challenge was clear now: See if I can apply some of Antoine's tricks from speeding up MapReduce, to the Aggregation framework, to reduce this test completion time from 23 seconds.
I considered which of the original 5 optimisations could realistically be applied to optimising the test with the Aggregation framework:
So the main thing I needed, in addition to including a 'sort', was to somehow split up the aggregation into parts to be run in parallel, by different threads, on subsets of the data. This would require me to prepend the existing pipeline with operations to match a range of documents in the collection and to sort the matched subset of documents, before allowing the original pipeline operation(s) to be run. Also, I would need to append an operation to the pipeline, to specify that the result should go to a specific named collection. So for example, for one of the subsets of the collection (documents with 'dim0' values ranging from 524915 to 774848), the following table shows how I need to convert the original pipeline:
A similar pipeline, but with different range values and output collection name, needs to be produced for each thread I intend to run. Each of these pipelines need to be invoked with db.coll.aggregate() in parallel, from different threads..
To achieve this in a generic way, from the Shell, I quickly wrote a small JavaScript function to 'decorate' a given pipeline with $match and $sort prepended and $out appended. The function ends by running MongoDB’s aggregate() function with the modified pipeline, against the subset of matching data.
// Define new wrapper aggregate function, to operate on subset of docs
// in collection after decorating the pipeline with pre and post stages
var aggregateSubset = function(dbName, srcClltName, tgtClltName,
singleMapKeyName, pipeline, min, max) {
var newPipeline = JSON.parse(JSON.stringify(pipeline)); //deep copy
var singleKeyMatch = {};
singleKeyMatch[singleMapKeyName] = (max < 0) ? {$gte: min}
: {$gte: min, $lt: max};
var singleKeySort = {};
singleKeySort[singleMapKeyName] = 1;
// Prepend pipeline with range match and sort stages
newPipeline.unshift(
{$match: singleKeyMatch},
{$sort: singleKeySort}
);
// Append pipeline with output to a named collection
newPipeline.push(
{$out: tgtClltName}
);
// Execute the aggregation on the modified pipeline
db = db.getSiblingDB(dbName);
var result = db[srcClltName].aggregate(newPipeline);
return {outCollection: tgtClltName, subsetResult: result};
}
This function takes the name of the database, source collection, and target collection, plus the original pipeline, plus the minimum and maximum values for the subset of the collection. The keen-eyed amongst you will have noticed the 'singleMapKeyName' parameter too. Basically, my example function is hard-coded to assume that the pipeline is mapping and sorting data anchored on a single key ('dim0' in my case, or this could be group-by 'customer', as another example). However, in other scenarios, a compound key may be being used (eg. group-by customer + year). My function would need to be enhanced, to be able to support compound keys.
Once I'd completed and tested my new aggregateSubset() JavaScript function. I was ready to run some test code in the Shell, to call this new function from multiple threads. Below is the code I used, specifying that 4 threads should be spawned. My host machine has 4 hardware threads, so this is probably not a bad number to use, even though I actually chose this number because it matched the thread count used in Antoine’s original MapReduce tests.
// Specify test parameter values
var dbName = 'mrtest';
var srcClltName = 'rawdata';
var tgtClltName = 'aggdata';
var singleMapKeyName = 'dim0';
var numThreads = 4;
// Find an even-ish distributed set of sub-ranges in the collection
var singleKeyPattern = {};
singleKeyPattern[singleMapKeyName] = 1;
var splitKeys = db.runCommand({splitVector: dbName+"."+srcClltName,
keyPattern: singleKeyPattern, maxChunkSizeBytes: 32000000}).splitKeys;
// There are more sub-ranges than threads to run, so work out
// how many sub-ranges to use in each thread
var inc = Math.floor(splitKeys.length / numThreads) + 1;
var threads = [];
// Start each thread, to execute aggregation on a subset of data
for (var i = 0; i < numThreads; ++i) {
var min = (i == 0) ? 0 : splitKeys[i * inc].dim0;
var max = (i * inc + inc >= splitKeys.length) ? -1
: splitKeys[i * inc + inc].dim0;
var t = new ScopedThread(aggregateSubset, dbName, srcClltName,
tgtClltName + min, singleMapKeyName, pipeline, min, max);
threads.push(t);
t.start();
}
// Wait for all threads to finish and print out their summaries
for (var i in threads) {
var t = threads[i];
t.join();
printjson(t.returnData());
}
This multi-threaded aggregation completed in 13 seconds, versus 23 seconds for running the normal 'non-optimised' version of the aggregation pipeline. This optimised pipeline involved writing to 4 different collections, each named 'aggdataN' (eg. 'aggdata374876').
Okay, this isn't the stunning 20x improvement, which Antoine achieved for MapReduce. However, given that the Aggregation framework is already a highly optimised MongoDB capability, I don't believe a 40% speed up is to be sniffed at. Also, I suspect that on host machines with more cores (hence requiring the thread count parameter used in my example to be increased), and with a much larger real-world data set, even more impressive results would be achieved, such as a 2x or 3x improvement.
As you'll notice I use a similar pattern to Antoine's, whereby I use the undocumented splitVector command to get a good idea of what is a well balanced set of ranges in the source collection. This is especially useful if the distribution of values in a collection is uneven and hard to predict. In my case, the distribution is pretty even and for 4 threads I could have got away with hard-coding ranges of 0-250000, 250000-500000, 500000-750000 & 750000-1000000. However, more real world collections will invariably have a much more uneven spread of values for a given key. With a naïve approach to partitioning up the data, one thread could end up doing much more work than the other threads. In real world scenarios, you may have other ways of predicting balanced subset ranges, based on the empirical knowledge you have of your data. You would then use a best guess effort to specify the split points for partitioning your collection. If you do choose to use the splitVector utility for a real application, I would suggest caching the value and only re-evaluating it intermittently, to avoid the latency it would otherwise add to the execution time of your aggregations.
Also, you'll notice I followed Antoine's approach of using the undocumented ScopedThread Shell utility object, to spawn new threads. If you are writing a real application to run an aggregation, you would instead use the native threading API of your chosen programming language/platform, alongside the appropriate MongoDB language driver.
One optimisation I wish I could have made, but couldn't was 'write out result subsets to different databases'. In MongoDB 2.6, one can specify an output collection name, but not an output database name, when calling aggregate(). My belief that different threads outputting to different databases would increase performance, is based on some observations I made during my tests. When running 'top', 'mongostat' and 'iostat' on my host machine whilst the optimised aggregation was running, I initially noticed that ‘top’ was reporting all 4 ‘CPUs’ as maxing out at around 95% CPU utilisation, coming down to a lower amount in the later stages of the run. At the point when CPU usage came down, ‘mongostat’ was reporting that the database 'mrtest' was locked for around 90% of the time. At the same point, disk utilisation was reported by ‘iostat' as being a modest 35%. This tells me that for part of the aggregation run, the various threads are all queuing waiting for the write lock, to be able to insert result documents into their own collections in the same database. By allowing different databases to be used, the write lock bottleneck for such parallel jobs would be diminished considerably, thus allowing the document inserts, and hence, the overall aggregation, to complete even quicker. I have created a JIRA enhancement request, asking for the ability for the Aggregation framework's $out operator to specify a target database, in addition to a target collection.
So I've managed to show that certain MapReduce-style workloads can be executed faster if using Aggregation in MongoDB. I showed the execution time can be reduced even further, by parallelising the aggregation work over subsets of the collection, using multiple threads. I demonstrated this for a single non-sharded database. In the future, I intend to investigate how a sharded database can be used to drive even better parallelisation and hence lower execution times, which I will then blog about.
Song for today: The Rat by The Walkmen
To re-cap, the original MapReduce tests looked for all the unique values in a collection, counting their occurrences. The following improvements were incrementally applied by Antoine against the 10 million documents in the collection:
- Define a 'sort' for the MapReduce job
- Use multiple-threads, each operating on a subset of the collection
- Write out result subsets to different databases
- Specify that the job should use 'pure JavaScript mode'
- Run the job using MongoDB 2.6 (an 'in-development' version) rather than 2.4
With all these optimisation in place, Antoine was able to reduce a 1200 second MapReduce job to just 60 seconds! Very impressive.
I expect that one of the reasons Antoine was focussing on MapReduce in MongoDB, was because MongoDB limited the output size of the faster Aggregation framework to 16 MB. With MongoDB 2.6, this threshold is removed. Aggregations generate multi-document results, rather than a single document and these can be navigated by the client applications using a cursor. Alternatively, an output collection can be specified for the Aggregation framework to write directly to. Thus, version 2.6 presents the opportunity to try the MapReduce test scenario with the Aggregation framework instead, to compare performance. In this blog post I do just that, focussing purely on testing a single, non-sharded database.
Re-running the optimised MapReduce
Before I tried running the test with the Aggregation framework, I thought I'd quickly run and time Antoine's optimised MapReduce job on my test machine to normalise any performance difference that occurs due to hardware disparity (for reference, my host machine is a Linux x86-64 laptop with SSD, 2 cores + hyper-threading, running MongoDB 2.6.0rc1).
I first had to insert the 10 million documents, as Antoine did, using the mongo Shell, with each insert containing a single field (‘dim0’) whose value is a random number between 0 and 1 million:
> for (var i = 0; i < 10000000; ++i) {
db.rawdata.insert({dim0: Math.floor(Math.random()*1000000)});
}
> db.rawdata.ensureIndex({dim0: 1})
I then ran Antoine's fully optimised MapReduce job, which completed in 32 seconds, versus 63 seconds on Antoine's host machine.
Running a normal, non-optimised Aggregation
So the first thing I needed to establish is how much faster or slower the equivalent Aggregation job is, compared with the 'optimised' MapReduce job I'd just run on the same hardware. The Aggregation pipeline is actually very simple to implement for this test. Below you can see the MapReduce code and the Aggregation pipeline equivalent:
map: function() { emit(this.dim0, 1); } reduce: function(key, values { return Array.sum(values); } |
var pipeline = [ {$group: { _id: "$dim0", value: {$sum: 1} }} ]; |
The pipeline basically just groups on the field 'dim0', counting the number of occurrences for this value. So I ran this Aggregation against my database, called 'mrtest', containing the random generated 'rawdata' collection:
> db = db.getSiblingDB('mrtest');
> db['rawdata'].aggregate(pipeline);
The aggregation completed in 23 seconds, versus 32 seconds for running the 'optimised' equivalent MapReduce job on the same hardware. This shows just how quick the standard out-of-the-box Aggregation capability is in MongoDB!
Running an optimised Aggregation
So my challenge was clear now: See if I can apply some of Antoine's tricks from speeding up MapReduce, to the Aggregation framework, to reduce this test completion time from 23 seconds.
I considered which of the original 5 optimisations could realistically be applied to optimising the test with the Aggregation framework:
- YES - Define a 'sort' for the job
- YES - Use multiple-threads, each operating on a subset of the collection
- NO - Write out result subsets to different databases (the Aggregation framework expects to write out a collection to the same database as the source collection being read)
- NO - Specify that the job should use 'pure JavaScript mode' (doesn't apply here as Aggregations run within C++ code)
- NO – Move to using a newer version of MongoDB, 2.4 to 2.6 (we are already at the latest version with my test runs, which is undoubtedly helping achieve the 23 second result already)
So the main thing I needed, in addition to including a 'sort', was to somehow split up the aggregation into parts to be run in parallel, by different threads, on subsets of the data. This would require me to prepend the existing pipeline with operations to match a range of documents in the collection and to sort the matched subset of documents, before allowing the original pipeline operation(s) to be run. Also, I would need to append an operation to the pipeline, to specify that the result should go to a specific named collection. So for example, for one of the subsets of the collection (documents with 'dim0' values ranging from 524915 to 774848), the following table shows how I need to convert the original pipeline:
[ {$group: { _id: "$dim0", value: {$sum: 1} }} ] |
[ {$match: { dim0: { $gte: 524915, $lt : 774849 } }}, {$sort: { dim0: 1 }}, {$group: { _id: $dim0, value: {$sum: 1} }}, {$out: “aggdata524915" } ] |
A similar pipeline, but with different range values and output collection name, needs to be produced for each thread I intend to run. Each of these pipelines need to be invoked with db.coll.aggregate() in parallel, from different threads..
To achieve this in a generic way, from the Shell, I quickly wrote a small JavaScript function to 'decorate' a given pipeline with $match and $sort prepended and $out appended. The function ends by running MongoDB’s aggregate() function with the modified pipeline, against the subset of matching data.
// Define new wrapper aggregate function, to operate on subset of docs
// in collection after decorating the pipeline with pre and post stages
var aggregateSubset = function(dbName, srcClltName, tgtClltName,
singleMapKeyName, pipeline, min, max) {
var newPipeline = JSON.parse(JSON.stringify(pipeline)); //deep copy
var singleKeyMatch = {};
singleKeyMatch[singleMapKeyName] = (max < 0) ? {$gte: min}
: {$gte: min, $lt: max};
var singleKeySort = {};
singleKeySort[singleMapKeyName] = 1;
// Prepend pipeline with range match and sort stages
newPipeline.unshift(
{$match: singleKeyMatch},
{$sort: singleKeySort}
);
// Append pipeline with output to a named collection
newPipeline.push(
{$out: tgtClltName}
);
// Execute the aggregation on the modified pipeline
db = db.getSiblingDB(dbName);
var result = db[srcClltName].aggregate(newPipeline);
return {outCollection: tgtClltName, subsetResult: result};
}
This function takes the name of the database, source collection, and target collection, plus the original pipeline, plus the minimum and maximum values for the subset of the collection. The keen-eyed amongst you will have noticed the 'singleMapKeyName' parameter too. Basically, my example function is hard-coded to assume that the pipeline is mapping and sorting data anchored on a single key ('dim0' in my case, or this could be group-by 'customer', as another example). However, in other scenarios, a compound key may be being used (eg. group-by customer + year). My function would need to be enhanced, to be able to support compound keys.
Once I'd completed and tested my new aggregateSubset() JavaScript function. I was ready to run some test code in the Shell, to call this new function from multiple threads. Below is the code I used, specifying that 4 threads should be spawned. My host machine has 4 hardware threads, so this is probably not a bad number to use, even though I actually chose this number because it matched the thread count used in Antoine’s original MapReduce tests.
// Specify test parameter values
var dbName = 'mrtest';
var srcClltName = 'rawdata';
var tgtClltName = 'aggdata';
var singleMapKeyName = 'dim0';
var numThreads = 4;
// Find an even-ish distributed set of sub-ranges in the collection
var singleKeyPattern = {};
singleKeyPattern[singleMapKeyName] = 1;
var splitKeys = db.runCommand({splitVector: dbName+"."+srcClltName,
keyPattern: singleKeyPattern, maxChunkSizeBytes: 32000000}).splitKeys;
// There are more sub-ranges than threads to run, so work out
// how many sub-ranges to use in each thread
var inc = Math.floor(splitKeys.length / numThreads) + 1;
var threads = [];
// Start each thread, to execute aggregation on a subset of data
for (var i = 0; i < numThreads; ++i) {
var min = (i == 0) ? 0 : splitKeys[i * inc].dim0;
var max = (i * inc + inc >= splitKeys.length) ? -1
: splitKeys[i * inc + inc].dim0;
var t = new ScopedThread(aggregateSubset, dbName, srcClltName,
tgtClltName + min, singleMapKeyName, pipeline, min, max);
threads.push(t);
t.start();
}
// Wait for all threads to finish and print out their summaries
for (var i in threads) {
var t = threads[i];
t.join();
printjson(t.returnData());
}
This multi-threaded aggregation completed in 13 seconds, versus 23 seconds for running the normal 'non-optimised' version of the aggregation pipeline. This optimised pipeline involved writing to 4 different collections, each named 'aggdataN' (eg. 'aggdata374876').
Okay, this isn't the stunning 20x improvement, which Antoine achieved for MapReduce. However, given that the Aggregation framework is already a highly optimised MongoDB capability, I don't believe a 40% speed up is to be sniffed at. Also, I suspect that on host machines with more cores (hence requiring the thread count parameter used in my example to be increased), and with a much larger real-world data set, even more impressive results would be achieved, such as a 2x or 3x improvement.
Some observations
As you'll notice I use a similar pattern to Antoine's, whereby I use the undocumented splitVector command to get a good idea of what is a well balanced set of ranges in the source collection. This is especially useful if the distribution of values in a collection is uneven and hard to predict. In my case, the distribution is pretty even and for 4 threads I could have got away with hard-coding ranges of 0-250000, 250000-500000, 500000-750000 & 750000-1000000. However, more real world collections will invariably have a much more uneven spread of values for a given key. With a naïve approach to partitioning up the data, one thread could end up doing much more work than the other threads. In real world scenarios, you may have other ways of predicting balanced subset ranges, based on the empirical knowledge you have of your data. You would then use a best guess effort to specify the split points for partitioning your collection. If you do choose to use the splitVector utility for a real application, I would suggest caching the value and only re-evaluating it intermittently, to avoid the latency it would otherwise add to the execution time of your aggregations.
Also, you'll notice I followed Antoine's approach of using the undocumented ScopedThread Shell utility object, to spawn new threads. If you are writing a real application to run an aggregation, you would instead use the native threading API of your chosen programming language/platform, alongside the appropriate MongoDB language driver.
One optimisation I wish I could have made, but couldn't was 'write out result subsets to different databases'. In MongoDB 2.6, one can specify an output collection name, but not an output database name, when calling aggregate(). My belief that different threads outputting to different databases would increase performance, is based on some observations I made during my tests. When running 'top', 'mongostat' and 'iostat' on my host machine whilst the optimised aggregation was running, I initially noticed that ‘top’ was reporting all 4 ‘CPUs’ as maxing out at around 95% CPU utilisation, coming down to a lower amount in the later stages of the run. At the point when CPU usage came down, ‘mongostat’ was reporting that the database 'mrtest' was locked for around 90% of the time. At the same point, disk utilisation was reported by ‘iostat' as being a modest 35%. This tells me that for part of the aggregation run, the various threads are all queuing waiting for the write lock, to be able to insert result documents into their own collections in the same database. By allowing different databases to be used, the write lock bottleneck for such parallel jobs would be diminished considerably, thus allowing the document inserts, and hence, the overall aggregation, to complete even quicker. I have created a JIRA enhancement request, asking for the ability for the Aggregation framework's $out operator to specify a target database, in addition to a target collection.
Conclusion
So I've managed to show that certain MapReduce-style workloads can be executed faster if using Aggregation in MongoDB. I showed the execution time can be reduced even further, by parallelising the aggregation work over subsets of the collection, using multiple threads. I demonstrated this for a single non-sharded database. In the future, I intend to investigate how a sharded database can be used to drive even better parallelisation and hence lower execution times, which I will then blog about.
Song for today: The Rat by The Walkmen