Sunday, February 4, 2018

Run MongoDB Aggregation Facets In Parallel For Faster Insight


MongoDB version 3.4 introduced a new Aggregation stage, $facet, to enable developers to "create multi-faceted aggregations which characterize data across multiple dimensions, or facets, within a single aggregation stage". For example, you may run a clothes retail website and use this aggregation capability to characterise the choices across a set of filtered products, by the following facets, simultaneously:
  1. Size (e.g. S, M, L,)
  2. Full-price vs On-offer
  3. Brand (e.g. Nike, Adidas)
  4. Average Rating (e.g. 1 - 5 stars)
In this blog post, I explore a way in which the response times for faceted aggregation workloads can be reduced, by leveraging parallel processing.

Parallelising Aggregated Facets

If an aggregation pipeline declares the use of the $facet stage, it defines multiple facets where each facet is a "sub-pipeline" containing a series actions specific to its facet. When a faceted aggregation is executed, the result of the aggregation will contain the combined output of all the facet's sub-pipelines. Below is an example of the structure of a "faceted" aggregation pipeline.
In this example, there are two facets or dimensions, each containing a sub-pipeline. Each sub-pipeline is essentially a regular aggregation pipeline, with just a small handful of restrictions on what it can contain. Notable amongst these restrictions is the fact that the sub-pipeline cannot contain a $facet stage. Therefore you can't use this to go infinite levels deep!

The ability to define an aggregation containing different facets is not just useful for responding to online user interactions, in realtime. It is also useful for activities such as running a business's "internal reporting" workloads, where a report may need to analyse a full data set, and then summarise the data in different dimensions.

A data set that I've been playing around with recently, is the publically available "MOT UK Annual Vehicle Test Result Data". An MOT is a UK annual safety check on a vehicle, and is mandatory for all cars over 3 years old. The UK government makes the data available to download, in anonymised form, for anyone to consume, through its platform. It's a rich data set, providing a lot of insight into the characteristics of cars that UK residents have been driving over the last ten years or so. As a result, it's a good data set for me to use to explore faceted aggregations.

2014-2016 MOT car data loaded into MongoDB - displayed in MongoDB Compass

To analyse the car data, I created a GitHub project at mongo-uk-car-data. This contains some Python scripts to load the data from the MOT data CSV files into MongoDB, and to perform various analytics on the data set using MongoDB's Aggregation Framework. One of the Python scripts I created,, uses a $facet stage to aggregate together summary information, in the following three different dimensions:
  1. Analyse the different car makes/brands (e.g. Ford, Vauxhall) and categorise them into a range of "buckets", based on how many different unique models each car make has.
  2. Summarise the amount of tested cars that fall into each fuel type category (e.g. Petrol, Diesel, Electric). Note: "Petrol" is equivalent to "Gas" for my American friends, I believe.
  3. List the top 5 car makes/brands from the car tests, showing how many cars there are for each car make, plus each car make's most popular and least popular models.
The following shows the result of the aggregation when run against the data set for years 2014-16 (a data set of approximately 113 million records).

When I ran this test on my Linux Laptop (hosting both a mongod server and the test Python script), the aggregation was completed in about 5:20 minutes. In my test Python client code, a faceted pipeline is constructed, which uses the PyMongo Driver to send the aggregation command and pipeline payload to the MongoDB database. Significantly, the database's Aggregation framework processes each facet's sub-pipeline serially. Therefore, for example, if the first facet takes 5 minutes to process, the second takes 2 minutes and the third facet takes 10 minutes, the client application will only receive a full response in just over 17 minutes.

It occurred to me that there was a way to potentially speed up the execution time of this analytics job. At the point of invoking collection.aggregate(pipeline) in the script, a custom function could be invoked instead, that internally breaks the pipeline up into separate pipelines, one for each facet. The function could then send each facet as a separate aggregation command, in parallel, to the MongoDB database to be processed, before merging the results into one, and returning it. Functionally, the behaviour of this code and the content of the response would be identical, but I hoped the response time would be significantly less. So I replaced the line of code that directly invoked the MongoDB aggregation command, collection.aggregate(pipeline), with a call to my new function aggregate_facets_in_parallel(collection, pipeline), instead. The implementation of this function can be seen in the Python file The function  uses Python's multiprocessing.pool.ThreadPool library to send each facet's sub-pipeline in a separate client thread and waits for all parallel aggregations to complete before returning the combined result.

This time, when I ran the test Python script against the same data set, I received the exact same result, but in a time of just 3:30 minutes (versus the original time of 5:20 minutes). This is not a bad speed up!  :-D

Some Observations

Some people may look at this and ask why, given that there were 3 facets, the aggregation didn't respond in just one third of the original time (i.e. in around 1:47 minutes). Well, there are many reasons, including:
  1. This would assume each separate facet sub-pipeline takes the same amount of time to execute, which is highly unlikely. The different facet sub-pipelines will each have different complexities and processing requirements. The overall response time cannot be any faster than the slowest of the 3 facet sub-pipelines.
  2. Just because the client code spawns 3 "concurrent" threads, it doesn't mean that these 3 threads are actually running completely in parallel. For example, my laptop has 2 CPU cores, which would be a cause of some resource contention. There will of course be many other potential causes of resource contention, such as multiple threads competing to retrieve different data from the same hard drive.
  3. For my simple tests, the client test Python script is running on the same machine as the MongoDB database, and thus will consume some of the compute capacity (albeit, for these tests, it will mostly just be blocking and waiting).
  4. In most real world cases (but not for my simple tests here), there may also be other workloads being processed by the MongoDB database simultaneously, consuming significant portions of the shared compute resources.
The other question people may ask is, if this is so simple, why doesn't the MongoDB server implement such parallelism itself for processing the different sections of an aggregation $facet stage. There are at least two reasons why this is not the case in MongoDB:
  1. My test scenario places some restrictions on the aggregation pipeline as a whole. Specifically, the top level pipeline must only contain one stage (the $facet stage) and my custom function throws an exception if this is not the case. This is fine and quite common where the workload is an analytical workload that needs to "full table scan" most or all of a data set. However, in the original retail example at the top of the post, the likelihood is that there would need to be a $match stage, before the $facet stage, to first restrict the multi-faceted clothes classifications based on a filter that the user has entered (e.g. product name contains "Black Trainers"). Thus, it may well be more efficient to perform the $match just once, to reduce the set of data to work with, before having this data passed on to a $facet stage. The workaround would be to duplicate the $match as the first stage of each of the $facet sub-pipelines, which could well turn out to be slower, as the same work would be repeated.
  2. For the most part, MongoDB's runtime architecture does not attempt to divide and process an individual client request's CRUD operations into parallel chunks, and instead processes the elements of an individual request serially. One reason why this is a good thing, is that typically a MongoDB database will be processing many requests in parallel from many clients. If one particular request was allowed to dominate the system's resources, by being parallelised "server-side" for a "burst of time", this may adversely affect other requests and cause the database to exhibit inconsistent performance as a whole. MongoDB's architecture generally encourages a fairer share of resources, spread across all clients' requests. For this reason, you may want to carefully consider how much you use the "client-side parallelism" tip in this blog post, in order to avoid abusing this "fair share" trust.


I've shown an example in this blog post of how multi-faceted MongoDB aggregations can be sped up by encouraging parallelism, from the client application's perspective. The choice of Python to implement this was fairly arbitrary. I could have implemented it in any programming languages that there is a MongoDB Driver for, using the appropriate multi-threading libraries for that language. The parallelism benefits discussed in this post are really aimed at analytics type workloads that need to process a whole data-set to produce multi-faceted insight. This is in contrast to the sorts of workloads that would first match a far smaller subset of records, against an index, before then aggregating on the small data subset.

Song for today: Bella Muerte by The Patch of Sky