Adding Parallelism
I then wrote a small Python test utility called Mongo Parallel Agg to achieve parallelism and test the outcome quickly. This utility analyses the movie data set, divides the data set into sections (e.g. 8 parts), and then spawns multiple sub-processes. Each sub-process runs in parallel, targeting one of the subsections of data.
MongoDB has a handy tool for working out the approximate “natural split points” for a given data set, which is the $bucketAuto aggregation operator. The mongo-parallel-agg app uses $bucketAuto to perform an operation similar to the following to understand the shape of the movie data set and identify its subsections (in this case asking for 8 approximately balanced subsections of the titles of movies):
pipeline = [
{"$bucketAuto": {
"$group": "$title",
"buckets": 8
}},
{"$group": {
"_id": "",
"splitPoints": {
"$push": "$_id.min",
},
}},
{"$unset": [
"_id",
]},
];
db.movies.aggregate(pipeline);
[{
splitPoints: [
'!Women Art Revolution',
'Boycott',
'Exotica',
'Ishqiya',
'Mr Perfect',
'Salesman',
'The Counterfeiters',
"The Strange History of Don't Ask, Don't Tell"
]
}]
As you can see from these results, it's important to analyse the spread of values in a collection to determine its natural divisions. A naïve approach would be to manually divide all the movie titles by their initial letter in the English alphabet (e.g. A-Z). With 26 letters, if you need 8 subsections, you might naïvely split at every 3rd or 4th letter (26 ÷ 8 = 3.25). You would come up with subsections such as "A-C", "S-U", etc. However, as you can tell from the $bucketAuto output above, each subsection would not be evenly balanced for the number of documents it covers. Many more movies begin with the letter "T" than other letters due to titles like "The …". Using uneven subsections of documents for parallel aggregations results in some sub-processes taking far longer than others, prolonging the overall response time of the entire aggregation workload.
The other main 'trick' the mongo-parallel-agg app pulls is to use the identified subsections information to produce multiple aggregation pipelines, one for each sub-process, targeting just a subset of the collection. The aggregation excerpt below provides an example of the pipeline dynamically generated by mongo-parallel-agg to target a subsection of data to help with calculating the average rating:
pipeline = [
{"$match": {
"title": {"$gte": "Boycott", "$lt": "Exotica"}}
},
{"$group": {
"_id": "",
"total": {"$sum": "$metacritic"},
"count": {"$sum": {"$cond": {
"if": {"$eq": ["$metacritic", null]},
"then": 0,
"else": 1
}}},
}},
];
db.movies.aggregate(pipeline);
[{ _id: '', total: 732084631, count: 12497379}]
As you can see, the pipeline uses a $match stage to target a subsection of the data for analysis. The other main change is that the pipeline no longer uses the $avg operator to calculate an average. Instead, the pipeline includes two computed fields, one for the total and one for the count, which each use the $sum operator. This change is required because, mathematically, calculating an average of averages will yield an invalid result. In this solution, the mongo-parallel-agg Python code performs the "last-mile" average computation by summing the totals produced by each sub-process. It then divides this grand total by the sum of all the counts calculated by each sub-process to determine the average. Also, you'll notice that the pipeline must handle ignoring documents for the count field if the field (metacritic) doesn't exist in a document (the previously used $avg operator automatically did this and so there was no need for a check).
To optimise each sub-process pipeline, the mongo-parallel-agg app first ensures a compound index exists for title & metacritic. This enables the $match part of the pipeline to target the index. It also enables the aggregation to be covered for increased efficiency because the only other field analysed (metacritic) belongs to the same index.
Purely for convenience, the mongo-parallel-agg demo app performs these two actions every time it executes before running the main aggregation workload against the collection (i.e. performing $bucketAuto analysis and ensuring an index exists). In a real production system, both these actions would be performed once or infrequently and not every time the aggregation workload runs. For this reason, the app doesn't start its aggregation execution timer until after it completes these two actions.
Results
The following table shows the execution times, in seconds, for an aggregation computing the average movie rating when run against different host environment topologies with varying levels of parallelism: