Sunday, September 17, 2023

New Paper Version Of Practical MongoDB Aggregations Book Now Available

Just over 2 years ago, I self-published the Practical MongoDB Aggregations eBook, which is also referenced by parts of the MongoDB Manual


Now, there is a MongoDB Inc. officially endorsed paper and electronic version of the book, published by Packt. The Packt version of the book includes extra information on some topics and two additional example chapters.


Practical MongoDB Aggregations book front cover


You can purchase the book from the Packt website, Amazon, or other book retailers.


This Practical MongoDB Aggregations book helps you unlock the full potential of the MongoDB aggregation framework, including the latest features of MongoDB 7.0. It arms you with practical, easy-to-digest principles and approaches for increasing your effectiveness in developing aggregation pipelines, supported by examples for building pipelines to solve complex data manipulation and analytical tasks.


This book is tailored to developers, architects, data analysts, data engineers, and data scientists with some familiarity with the aggregation framework (it’s not for aggregation beginners). It starts by explaining the framework’s architecture and then shows you how to build pipelines optimized for productivity and scale. Given the critical role arrays play in MongoDB’s document model, the book delves into best practices for optimally manipulating arrays. The latter part of the book equips you with examples to solve common data processing challenges so you can apply the lessons you’ve learned to practical situations.


What You Will Learn

  • Develop dynamic aggregation pipelines tailored to changing business requirements

  • Eliminate the performance penalties of processing data externally by filtering, grouping, and calculating aggregated values directly within the database

  • Master essential techniques to optimize aggregation pipelines for rapid data processing

  • Achieve optimal efficiency for applying aggregations to vast datasets with effective sharding strategies

  • Employ MongoDB expressions to transform data and arrays for deeper insights

  • Secure your data access and distribution with the help of aggregation pipelines


 I hope you enjoy the book!

Monday, December 6, 2021

Achieving At Least An Order Of Magnitude Aggregation Performance Improvement By Scaling & Parallelism

Introduction

When I started my MongoDB Inc career 8 years ago, the 'bootcamp' project topic I chose for self-learning was to investigate how to speed up aggregations via parallelism. Specifically, I investigated the benefits of splitting an aggregation into parts, each running against a subset of the data concurrently. At the time, my study yielded a positive outcome by reducing the response time of a "full collection scan" style aggregation (see my original unsharded and sharded results and write-ups). However, in hindsight, running everything inside a single laptop dulled the impact. In reality, I was probably hitting host machine resource contention (e.g. CPU, RAM, Storage IOPS limits) in a way that wouldn't occur in a real distributed environment. 


I thought I’d take the opportunity to revisit this topic:


Can I improve the performance of a 'full-table-scan' type of aggregation workload by splitting the aggregation into parallel jobs, each operating on a subset of the data?


This time, I chose to test against a database containing all the movies ever catalogued to calculate the average movie rating across all movies. 


I decided to use a remote MongoDB cluster deployment for the test environment, with separate host machines for each replica/shard. Just because it is so easy to rapidly create a production-like environment, I used MongoDB Atlas to provision and host the database cluster. I’d expect to see similar results if I was to run the tests on equivalent hardware for any self-managed version of MongoDB.


The executed tests analyse how the aggregation workload completion time changes when adding more hardware (e.g. CPUs), more shards plus more parallelisation of the aggregation's pipeline, in various combinations.

Data

I created a collection of 100 million ‘movie’ documents by duplicating records from the smaller sample_mflix database sourced from the Atlas sample data set. Approximately ⅓ of the documents in the movies collection have a field called metacritic which provides an aggregated movies rating across many reviews collated by the Metacritic website.


Below is an example of a movie document, from the collection, for one of my favourite films, Drive:

Aggregation

I wanted to compute the average metacritic score across every movie in the database collection. Using the MongoDB Shell (mongosh), I am able to execute the following aggregation pipeline to calculate the average rating across all movies:


use sample_mflix;


pipeline = [

    {"$group": {

        "_id": "",

        "average_rating": {"$avg": "$metacritic"},

    }},

];


db.movies.aggregate(pipeline);


[{_id: '', average_rating: 59.387109125717934}]

Adding Parallelism

I then wrote a small Python test utility called Mongo Parallel Agg to achieve parallelism and test the outcome quickly. This utility analyses the movie data set, divides the data set into sections (e.g. 8 parts), and then spawns multiple sub-processes. Each sub-process runs in parallel, targeting one of the subsections of data.


MongoDB has a handy tool for working out the approximate “natural split points” for a given data set, which is the $bucketAuto aggregation operator. The mongo-parallel-agg app uses $bucketAuto to perform an operation similar to the following to understand the shape of the movie data set and identify its subsections (in this case asking for 8 approximately balanced subsections of the titles of movies):


pipeline = [

    {"$bucketAuto": {

        "$group": "$title",

        "buckets": 8

    }},


    {"$group": {

        "_id": "",

        "splitPoints": {

            "$push": "$_id.min",

        },

    }},


    {"$unset": [

        "_id",

    ]},

];


db.movies.aggregate(pipeline);


[{

    splitPoints: [

      '!Women Art Revolution',

      'Boycott',

      'Exotica',

      'Ishqiya',

      'Mr Perfect',

      'Salesman',

      'The Counterfeiters',

      "The Strange History of Don't Ask, Don't Tell"

    ]

}]


As you can see from these results, it's important to analyse the spread of values in a collection to determine its natural divisions. A naïve approach would be to manually divide all the movie titles by their initial letter in the English alphabet (e.g. A-Z). With 26 letters, if you need 8 subsections, you might naïvely split at every 3rd or 4th letter (26 ÷ 8 = 3.25). You would come up with subsections such as "A-C", "S-U", etc. However, as you can tell from the $bucketAuto output above, each subsection would not be evenly balanced for the number of documents it covers. Many more movies begin with the letter "T" than other letters due to titles like "The …". Using uneven subsections of documents for parallel aggregations results in some sub-processes taking far longer than others, prolonging the overall response time of the entire aggregation workload.


The other main 'trick' the mongo-parallel-agg app pulls is to use the identified subsections information to produce multiple aggregation pipelines, one for each sub-process, targeting just a subset of the collection. The aggregation excerpt below provides an example of the pipeline dynamically generated by mongo-parallel-agg to target a subsection of data to help with calculating the average rating:


pipeline = [

    {"$match": {

        "title": {"$gte": "Boycott", "$lt": "Exotica"}}

    },

 

    {"$group": {

        "_id": "",

        "total": {"$sum": "$metacritic"},

        "count": {"$sum": {"$cond": {

                    "if": {"$eq": ["$metacritic", null]},

                    "then": 0,

                    "else": 1

        }}},

    }},

];

    

db.movies.aggregate(pipeline);


[{ _id: '', total: 732084631, count: 12497379}]


As you can see, the pipeline uses a $match stage to target a subsection of the data for analysis. The other main change is that the pipeline no longer uses the $avg operator to calculate an average. Instead, the pipeline includes two computed fields, one for the total and one for the count, which each use the $sum operator. This change is required because, mathematically, calculating an average of averages will yield an invalid result. In this solution, the mongo-parallel-agg Python code performs the "last-mile" average computation by summing the totals produced by each sub-process. It then divides this grand total by the sum of all the counts calculated by each sub-process to determine the average. Also, you'll notice that the pipeline must handle ignoring documents for the count field if the field (metacritic) doesn't exist in a document (the previously used $avg operator automatically did this and so there was no need for a check).


To optimise each sub-process pipeline, the mongo-parallel-agg app first ensures a compound index exists for title & metacritic. This enables the $match part of the pipeline to target the index. It also enables the aggregation to be covered for increased efficiency because the only other field analysed (metacritic) belongs to the same index.


Purely for convenience, the mongo-parallel-agg demo app performs these two actions every time it executes before running the main aggregation workload against the collection (i.e. performing $bucketAuto analysis and ensuring an index exists). In a real production system, both these actions would be performed once or infrequently and not every time the aggregation workload runs. For this reason, the app doesn't start its aggregation execution timer until after it completes these two actions.

Results

The following table shows the execution times, in seconds, for an aggregation computing the average movie rating when run against different host environment topologies with varying levels of parallelism:


Per replica host specification:  Intel Xeon processor with a maximum speed of 3.1 GHz, 512 TB storage with 3000 non-provisioned IOPS

Observations
  • Scalability. The results show the solution is able to achieve scalability in multiple individual dimensions:

    • Vertical Scaling. The execution time is reduced by adding more vCPUs and making no other changes. Note, not evident in the displayed results is the effect of adding more RAM. In some cases, the addition of RAM does actually have an impact. See the later bullet-point titled "Increasing RAM For Analytics May Help" for more detail.

    • Horizontal Scaling. The execution time is reduced by adding more shards and making no other changes. 

    • Parallel-processing Scaling. The execution time is reduced by splitting the aggregation into parallel sub-processes, each acting on a subset of records and making no other changes. 

  • Combined Scaling. Overall, by combining the benefits of all three scaling dimensions, the solution manifests two orders of magnitude of reduction in execution time - from 908 seconds (over 15 minutes) down to 9 seconds.

  • Non-Linear Scaling. The scaling exhibited isn't linear, but I wouldn't expect this because map-reduce style workloads, such as calculating averages, will never scale 100% linearly. Such workloads must serialise parts of the computation to accumulate partial results together in one place and sum them together.

  • Unexpected Degree Of Speed-Up From 1 to 2 Sub-Processes. In each situation where there is a transition from a single aggregation process to two parallel sub-processes, there is at least a 6x speed-up. This difference is far more significant than the typical "best-case" linear (2x) speed-up that could have realistically been hoped for. This puzzled me for quite a while, but I eventually realised why this occurs. See the later section titled "The Slow Single-Threaded Result Conundrum" for the reason why.

  • Optimal Sub-Process to CPU Mapping. The optimal number of sub-processes to execute for this particular workload appears to be roughly 1 to 2 times the total vCPUs available. For example, in the first result row in the table (M40, 1 shard), for a total of 4 vCPUs, 8 sub-processes yields the quickest result. Another example is visible in the penultimate table row (M60, 2 shards). Here, the solution yields the quickest result for a total of 32 vCPUs when running either 32 or 64 sub-processes. Overall, we can infer that at some point, between 1 and 2 sub-processes per vCPU, the benefits of multiprocessing are outweighed by the overhead of facilitating so many processes.

  • Increasing RAM For Analytics Can Help. The table's results for the M60 two-shards and four-shards configurations, for the single sub-process tests, do not capture the full picture. In both cases, the response time significantly decreases (not shown in the table) when running the test configuration for the second time with the same aggregation pipeline. For the M60 two-shard single-process test (with a combined RAM total of 128GB), the response time was reduced from 557 seconds to 438 seconds. For the M60 four-shard single-process test (with a combined RAM total of 256GB), the response time was reduced from 340 seconds to 102 seconds. This latency drop occurs because, for these configurations, the size of RAM available across the shards is approaching the size of the full collection. Consequently, a significant portion of the data is fetched directly from RAM rather than from disk, because the data is already present in memory following the first test run. The remaining four smaller test configurations exhibited no noticeable difference in execution times between first and second runs.

  • Increasing Storage IOPS Was Not Tested. I employed the tests to analyse the effects of scaling parallel processing factors such as the number of CPU cores, shards and sub-processes. I expect the results to be even better when increasing the storage IOPS allocated to the host machines. This increased storage bandwidth should enable an aggregation to pull the scanned collection data from storage quicker. However, this aspect wasn't under consideration here and wasn’t tested, and so I've left it as an exercise for the reader.

The Slow Single-Threaded Result Conundrum

So why is there a jump down of at least 6x lower latency when going from a single process to two parallel sub-processes to calculate the average movie rating for all movies?


Having run all the tests, I investigated deeper and realised why this phenomenon occurred. I'd initially optimised the mongo-parallel-agg app when splitting an aggregation into multiple pipelines, to each performing a $match for a subset of data. I’d realised that each pipeline was no longer performing a "full collection scan" and would benefit from an index on the title field being used by the $match filter. Additionally, I’d realised that the only other field used by these "split aggregations" was the metacritic field from which the average is calculated. Therefore, rather than using a simple index on title, I’d employed a compound index on title & metacritic to cover the query part of the aggregation. Consequently, each aggregation sub-process was able to fully leverage an index and didn’t need to scan each full document (so full documents were not pulled from disk). Also, the index is far smaller than the full collection's size, and therefore, the database can rapidly serve the index’s data from RAM. 


During the tests, I’d not considered whether I could apply some of these same benefits when just running the entire aggregation pipeline as a single process. It was only later that I realised I could also optimise the original “full-collection” aggregation with a small “hack”, inspired by how I’d originally optimised the divided pipelines for sub-processing. Upon testing this hack for just the M60 two-shard configuration (single process), I obtained the following results:


557 seconds (mostly on disk)  → 448 seconds (significantly in RAM)  → 92 seconds (with the pipeline hack)


My hack involved refactoring the aggregation pipeline that the application generates for the single-threaded processing option, as shown below:


Using the “filter documents greater than MinKey” hack, I saw that the aggregation performs an index scan rather than a full-collection scan. The runtime invariably pulls this [far smaller] index from RAM rather than disk. The aggregation is covered, locating all the required title and metacritic values from the index with no need to examine each underlying document in the collection.


Interestingly, before including my hack, I first tried defining just a simple index on metacritic and using a hint to force the “full aggregation” to use this index for a covered query. However, this didn’t yield any performance benefits and it was even slower.


Unfortunately, I’d run out of time to re-test all the deployment scenarios with this improved single-process pipeline. Therefore, the results table reflects the original results before this final improvement. I have since folded the code refactoring into the Github codebase for the mongo-parallel-agg project so that others can leverage this optimisation in the future.


Also, upon reflection, instead of using the “match documents greater than MinKey” filter in a new $match stage, I suspect I could probably use something like the $exists operator instead in the new $match stage. My guess is this would also ensure the aggregation pipeline leverages the compound index, and as a covered query, rather than performing a full collection scan. Additionally, it may also be the case that using a sparse index or compound index provides further improvement for all the aggregations executed, whether parallelised or not (to be determined). I will leave these two elements for the reader to test.


EDIT: 7-Dec-2021: Prompted by a colleague, Chris Harris, I tried employing the hint mechanism again since publishing this blog post. This time, using a hint did work, and the single-threaded aggregation leveraged an index as a covered query. On the previous occasion I'd tried this, I suspect I'd introduced a typo when referencing the index from the hint. In conclusion, there appears to be a few different options for inducing an index scan and covered query to occur. For the background on why you explicitly have to force an index scan to be used, rather than a collection scan when there is no find()/$match filter defined, review the server ticket 20066.

Summary

I've shown here that, for a specific aggregation, I can reduce the time taken to calculate the average rating across 100 million movies on "mid-range" hardware by two orders of magnitude. I achieved this by scaling vertically (adding more CPUs), scaling horizontally (adding more shards), and by increasing the number of processes run in parallel. Also, some pipeline tweaking and index tuning helped. 


These findings don't guarantee that every type of aggregation workload will scale to the same degree, or even at all when applying similar configuration changes. The impact will depend on the nature of the aggregation pipeline.


I'd be remiss not to recommend that before trying the scaling optimisations outlined here, you should first ensure you have optimised your aggregation pipeline more generally. Scaling comes with an increased infrastructure cost which can often be avoidable. Also, the act of running more sub-processes for each aggregation could drain computation power you’d previously “allocated” to other types of workloads running against the database. My book, Practical MongoDB Aggregations, and specifically the following sections, outlines some of the techniques to use to optimise your aggregations without applying scaling changes:



Lastly, is the obvious question about whether I believe it is possible for the movie average rating calculation to take less than one second to complete for 100 million movies? The answer is absolutely yes, with sufficiently increased CPUs, RAM, storage IOPS, shards and parallel sub-processes. However, I suspect it may be a while before I find out for sure because the required uplift in hardware is likely to be cost-prohibitive for me at least.




Song for today: Carol by The Peep Tempel