Friday, April 13, 2018

MongoDB Graph Query Example, Inspired by Designing Data-Intensive Applications Book


People who have worked with me recently are probably bored by me raving about how good this book is: Designing Data-Intensive Applications by Martin Kleppmann (O'Reilly, 2016). Suffice to say, if you are in IT and have any sort of interest in databases and/or data-driven applications, you should read this book. You will be richly rewarded for the effort.

In the second chapter of the book ('Data Models and Query Languages'), Martin has a section called 'Graph Like Data Models' which explores 'graph use cases' where many-to-many relationships are typically modelled with tree-like structures, with indeterminate numbers of inter-connections. The book section shows how a specific 'graph problem' can be solved by using a dedicated graph database technology with associated query language (Cypher) and by using an ordinary relational database with associated query language (SQL). One thing that quickly becomes evident, when reading this section of the book, is how difficult it is in a relational database to model complex many-to-many relationships. This may come as a surprise to some people. However, this is consistent with something I've subconsciously learnt over 20 years of using relational databases, which is, relationships ≠ relations, in the world of RDBMS.

The graph scenario illustrated in the book shows an example of two people, Lucy and Alain, who are married to each other, who are born in different places and who now live together in a third place. For clarity, I've included the diagram from the book, below, to best illustrate the scenario (annotated with the book's details, in red, for reference).

Throughout the book, numerous types of databases and data-stores are illustrated, compared and contrasted, including MongoDB in many places. However the book's section on graph models doesn't show how MongoDB can be used to solve the example graph scenario. Therefore, I thought I take this task on myself. Essentially, the premise is that there is a data-set of many people, with data on the place each person was born in and the place each person now lives in. Of course, any given place may be within a larger named place, which may in turn be within a larger named place, and so on, as illustrated in the diagram above. In the rest of this blog post I show one way that such data structures and relationships can be modelled in MongoDB and then leveraged by MongoDB's graph query capabilities (specifically using the graph lookup feature of MongoDB's Aggregation Framework). What will be demonstrated is how to efficiently answer the exam question posed by the book, namely: 'Find People Who Emigrated From US To Europe'.

Solving The Book's Graph Challenge With MongoDB

To demonstrate the use of MongoDB's Aggregation 'graph lookup' capability to answer the question 'Find People Who Emigrated From US To Europe', I've created the following two MongoDB collections, populated with data:
  1. 'persons' collection. Contains around one million randomly generated person records, where each person has 'born_in' and 'lives_in' attributes, which each reference a 'starting' place record in the places collection.
  2. 'places' collection. Contains hierarchical geographical places data, with the graph structure of: SUBDIVISIONS-->COUNTRIES-->SUBREGIONS-->CONTINENTS. Note: The granularity and hierarchy of the data-set is slightly different than illustrated in the book, due to the sources of geographical data I had available to cobble together.
Similar to the book's example, amongst the many 'persons' records stored in MongoDB data-set, are the following two records relating to 'Lucy' and 'Alain'.

{fullname: 'Lucy Smith', born_in: 'Idaho', lives_in: 'England'}
{fullname: 'Alain Chirac', born_in: 'Bourgogne-Franche-Comte', lives_in: 'England'}

Below is an excerpt of some of the records from the 'places' collection, which illustrates how a place record may refer to another place record, via its 'part_of' attribute.

{name: 'England', type: 'subdivision', part_of: 'United Kingdom of Great Britain and Northern Ireland'}
{name: 'United Kingdom of Great Britain and Northern Ireland', type: 'country', part_of: 'Northern Europe'}
{name: 'Northern Europe', type: 'subregion', part_of: 'Europe'}
{name: 'Europe', type: 'continent', part_of: ''}

If you want to access this data yourself and load it into the two MongoDB database collections, I've created JSON exports of both collections and made these available in a GitHub project (see the project's README for more details on how to load the data into MongoDB and then how to actually run the example's 'graph lookup' aggregation pipeline).

The MongoDB aggregation pipeline I created, to process the data across these two collections and to answer the question 'Find People Who Emigrated From US To Europe', has the following stages:
  1. $graphLookup: For every record in the 'persons' collection, using the person's 'born_in' attribute, locate the matching record in the  'places' collection and then walk the chain of ancestor place records building up a hierarchy of 'born in' place names.
  2. $match: Only keep 'persons' records, where the 'born in' hierarchy of discovered place names includes 'United States of America'.
  3. $graphLookup: For each of these remaining 'persons' records, using each person's 'lives_in' attribute, locate the matching record in the 'places' collection and then walk the chain of ancestor place records building up a hierarchy of 'lives in' place names.
  4. $match: Only keep around the remaining 'persons' records, where the 'lives in' hierarchy of discovered place names includes 'Europe'.
  5. $project: For the resulting records to be returned, just show the attributes 'fullname', 'born_in' and 'lives_in'.

The actual MongoDB Aggregation Pipeline for this is:

    {$graphLookup: {
        from: 'places',
        startWith: '$born_in',
        connectFromField: 'part_of',
        connectToField: 'name',
        as: 'born_hierarchy'
    {$match: {'': born}},
    {$graphLookup: {
        from: 'places',
        startWith: '$lives_in',
        connectFromField: 'part_of',
        connectToField: 'name',
        as: 'lives_hierarchy'
    {$match: {'': lives}},
    {$project: {
        _id: 0,
        fullname: 1, 
        born_in: 1, 
        lives_in: 1, 

When this aggregation is executed, after first declaring values for the variables highlighted in red...

var born = 'United States of America', lives = 'Europe'

...the following is an excerpt of the output that is returned by the aggregation:

{fullname: 'Lucy Smith', born_in: 'Idaho', lives_in: 'England'}
{fullname: 'Bobby Mc470', born_in: 'Illinois', lives_in: 'La Massana'}
{fullname: 'Sandy Mc1529', born_in: 'Mississippi', lives_in: 'Karbinci'}
{fullname: 'Mandy Mc2131', born_in: 'Tennessee', lives_in: 'Budapest'}
{fullname: 'Gordon Mc2472', born_in: 'Texas', lives_in: 'Tyumenskaya oblast'}
{fullname: 'Gertrude Mc2869', born_in: 'United States of America', lives_in: 'Planken'}
{fullname: 'Simon Mc3087', born_in: 'Indiana', lives_in: 'Ribnica'}

On my laptop, using the data-set of a million person records, the aggregation takes about 45 seconds to complete. However, if I first define the index...

db.places.createIndex({name: 1})

...and then run the aggregation, it only takes around 2 seconds to execute. This shows just how efficiently the 'graphLookup' capability is able to walk a graph of relationships, by leveraging an appropriate index.


I've shown the expressiveness and power of MongoDB's aggregation framework, combined with 'graphLookup' pipeline stages, to perform a query of a graph of relationships across many records. A 'graphLookup' stage is efficient as it avoids the need to develop client application logic to programmatically navigate each hop of a graph of relationships, and thus avoids the network round trip latency that a client, traversing each hop, would otherwise incur. The 'graphLookup' stage can and should leverage an index, to enable the 'tree-walk' process to be even more efficient.

Although MongoDB may not be as rich in terms of the number of graph processing primitives it provides, compared with 'dedicated' graph databases, it possesses some key advantages for 'graph' use cases:
  1. Business Critical Applications. MongoDB is designed for, and invariably deployed as a realtime operational database, with built-in high availability and enterprise security capabilities to support realtime business critical uses. Dedicated graph databases tend to be built for 'back-office' and 'offline' analytical uses, with less focus on high availability and security. If there is a need to leverage a database to respond to graph queries in realtime for applications sensitive to latency, availability and security, MongoDB is likely to be a great fit.
  2. Cost of Ownership & Timeliness of Insight. Often, there may be requirements to satisfy CRUD random realtime operations on individual data records and satisfy graph-related analysis of the data-set as a whole. Traditionally, this would require an ecosystem containing two types of database, an operational database and a graph analytical database. A set of ETL processes would then need to be developed to keep the duplicated data synchronised between the two databases. By combining both roles in a single MongoDB distributed database, with appropriate workload isolation, the financial cost of this complexity can be greatly reduced, due to a far simpler deployment. Additionally, and as a consequence, there will be no lag that arises when keeping one copy of data in one system, up to date with the other copy of the data in another system. Rather than operating on stale data, the graph analytical workloads operate on current data to provide more accurate business insight.

Song for today: Cosmonauts by Quicksand

Sunday, February 4, 2018

Run MongoDB Aggregation Facets In Parallel For Faster Insight


MongoDB version 3.4 introduced a new Aggregation stage, $facet, to enable developers to "create multi-faceted aggregations which characterize data across multiple dimensions, or facets, within a single aggregation stage". For example, you may run a clothes retail website and use this aggregation capability to characterise the choices across a set of filtered products, by the following facets, simultaneously:
  1. Size (e.g. S, M, L,)
  2. Full-price vs On-offer
  3. Brand (e.g. Nike, Adidas)
  4. Average Rating (e.g. 1 - 5 stars)
In this blog post, I explore a way in which the response times for faceted aggregation workloads can be reduced, by leveraging parallel processing.

Parallelising Aggregated Facets

If an aggregation pipeline declares the use of the $facet stage, it defines multiple facets where each facet is a "sub-pipeline" containing a series actions specific to its facet. When a faceted aggregation is executed, the result of the aggregation will contain the combined output of all the facet's sub-pipelines. Below is an example of the structure of a "faceted" aggregation pipeline.
In this example, there are two facets or dimensions, each containing a sub-pipeline. Each sub-pipeline is essentially a regular aggregation pipeline, with just a small handful of restrictions on what it can contain. Notable amongst these restrictions is the fact that the sub-pipeline cannot contain a $facet stage. Therefore you can't use this to go infinite levels deep!

The ability to define an aggregation containing different facets is not just useful for responding to online user interactions, in realtime. It is also useful for activities such as running a business's "internal reporting" workloads, where a report may need to analyse a full data set, and then summarise the data in different dimensions.

A data set that I've been playing around with recently, is the publically available "MOT UK Annual Vehicle Test Result Data". An MOT is a UK annual safety check on a vehicle, and is mandatory for all cars over 3 years old. The UK government makes the data available to download, in anonymised form, for anyone to consume, through its platform. It's a rich data set, providing a lot of insight into the characteristics of cars that UK residents have been driving over the last ten years or so. As a result, it's a good data set for me to use to explore faceted aggregations.

2014-2016 MOT car data loaded into MongoDB - displayed in MongoDB Compass

To analyse the car data, I created a GitHub project at mongo-uk-car-data. This contains some Python scripts to load the data from the MOT data CSV files into MongoDB, and to perform various analytics on the data set using MongoDB's Aggregation Framework. One of the Python scripts I created,, uses a $facet stage to aggregate together summary information, in the following three different dimensions:
  1. Analyse the different car makes/brands (e.g. Ford, Vauxhall) and categorise them into a range of "buckets", based on how many different unique models each car make has.
  2. Summarise the amount of tested cars that fall into each fuel type category (e.g. Petrol, Diesel, Electric). Note: "Petrol" is equivalent to "Gas" for my American friends, I believe.
  3. List the top 5 car makes/brands from the car tests, showing how many cars there are for each car make, plus each car make's most popular and least popular models.
The following shows the result of the aggregation when run against the data set for years 2014-16 (a data set of approximately 113 million records).

When I ran this test on my Linux Laptop (hosting both a mongod server and the test Python script), the aggregation was completed in about 5:20 minutes. In my test Python client code, a faceted pipeline is constructed, which uses the PyMongo Driver to send the aggregation command and pipeline payload to the MongoDB database. Significantly, the database's Aggregation framework processes each facet's sub-pipeline serially. Therefore, for example, if the first facet takes 5 minutes to process, the second takes 2 minutes and the third facet takes 10 minutes, the client application will only receive a full response in just over 17 minutes.

It occurred to me that there was a way to potentially speed up the execution time of this analytics job. At the point of invoking collection.aggregate(pipeline) in the script, a custom function could be invoked instead, that internally breaks the pipeline up into separate pipelines, one for each facet. The function could then send each facet as a separate aggregation command, in parallel, to the MongoDB database to be processed, before merging the results into one, and returning it. Functionally, the behaviour of this code and the content of the response would be identical, but I hoped the response time would be significantly less. So I replaced the line of code that directly invoked the MongoDB aggregation command, collection.aggregate(pipeline), with a call to my new function aggregate_facets_in_parallel(collection, pipeline), instead. The implementation of this function can be seen in the Python file The function  uses Python's multiprocessing.pool.ThreadPool library to send each facet's sub-pipeline in a separate client thread and waits for all parallel aggregations to complete before returning the combined result.

This time, when I ran the test Python script against the same data set, I received the exact same result, but in a time of just 3:30 minutes (versus the original time of 5:20 minutes). This is not a bad speed up!  :-D

Some Observations

Some people may look at this and ask why, given that there were 3 facets, the aggregation didn't respond in just one third of the original time (i.e. in around 1:47 minutes). Well, there are many reasons, including:
  1. This would assume each separate facet sub-pipeline takes the same amount of time to execute, which is highly unlikely. The different facet sub-pipelines will each have different complexities and processing requirements. The overall response time cannot be any faster than the slowest of the 3 facet sub-pipelines.
  2. Just because the client code spawns 3 "concurrent" threads, it doesn't mean that these 3 threads are actually running completely in parallel. For example, my laptop has 2 CPU cores, which would be a cause of some resource contention. There will of course be many other potential causes of resource contention, such as multiple threads competing to retrieve different data from the same hard drive.
  3. For my simple tests, the client test Python script is running on the same machine as the MongoDB database, and thus will consume some of the compute capacity (albeit, for these tests, it will mostly just be blocking and waiting).
  4. In most real world cases (but not for my simple tests here), there may also be other workloads being processed by the MongoDB database simultaneously, consuming significant portions of the shared compute resources.
The other question people may ask is, if this is so simple, why doesn't the MongoDB server implement such parallelism itself for processing the different sections of an aggregation $facet stage. There are at least two reasons why this is not the case in MongoDB:
  1. My test scenario places some restrictions on the aggregation pipeline as a whole. Specifically, the top level pipeline must only contain one stage (the $facet stage) and my custom function throws an exception if this is not the case. This is fine and quite common where the workload is an analytical workload that needs to "full table scan" most or all of a data set. However, in the original retail example at the top of the post, the likelihood is that there would need to be a $match stage, before the $facet stage, to first restrict the multi-faceted clothes classifications based on a filter that the user has entered (e.g. product name contains "Black Trainers"). Thus, it may well be more efficient to perform the $match just once, to reduce the set of data to work with, before having this data passed on to a $facet stage. The workaround would be to duplicate the $match as the first stage of each of the $facet sub-pipelines, which could well turn out to be slower, as the same work would be repeated.
  2. For the most part, MongoDB's runtime architecture does not attempt to divide and process an individual client request's CRUD operations into parallel chunks, and instead processes the elements of an individual request serially. One reason why this is a good thing, is that typically a MongoDB database will be processing many requests in parallel from many clients. If one particular request was allowed to dominate the system's resources, by being parallelised "server-side" for a "burst of time", this may adversely affect other requests and cause the database to exhibit inconsistent performance as a whole. MongoDB's architecture generally encourages a fairer share of resources, spread across all clients' requests. For this reason, you may want to carefully consider how much you use the "client-side parallelism" tip in this blog post, in order to avoid abusing this "fair share" trust.


I've shown an example in this blog post of how multi-faceted MongoDB aggregations can be sped up by encouraging parallelism, from the client application's perspective. The choice of Python to implement this was fairly arbitrary. I could have implemented it in any programming languages that there is a MongoDB Driver for, using the appropriate multi-threading libraries for that language. The parallelism benefits discussed in this post are really aimed at analytics type workloads that need to process a whole data-set to produce multi-faceted insight. This is in contrast to the sorts of workloads that would first match a far smaller subset of records, against an index, before then aggregating on the small data subset.

Song for today: Bella Muerte by The Patch of Sky