Wednesday, December 16, 2015

MongoDB's BI Connector and pushdown of SQL WHERE clauses

In previous posts I showed how to use SQL & ODBC to query data from MongoDB, via Windows clients and Linux clients. In this post, I want to explore what happens to the SQL statement when it is sent to the BI Connector and onto the MongoDB database. For example, is the SQL WHERE clause pushed down to the database to resolve?

Again, for these tests, I've used the same MOT UK car test results data set as my last two posts.

As I wanted to get a better insight into what MongoDB is doing under the covers to process SQL queries, I used the Mongo Shell to enable profiling for the MongoDB database holding the MOT data.

Then on my Linux desktop client, using the ODBC settings I'd configured in my last post, I fired up isql ready to start issuing queries against the MOT data set, via the BI Connector.

I submitted a SQL statement to query all cars with a recorded mileage of over 500,000 miles, selecting specific columns only.

> SELECT make, model, test_mileage FROM testresults WHERE test_mileage > 500000;

The results were correctly returned in isql, but I was more interested to see what MongoDB was asked to do on the server-side, to fulfil this request. So using the Mongo Shell I queried the system profile collection to show the last recorded entry, displaying the exact request that MongoDB had received as a result of the translated SQL query.

> db.system.profile.find({ns: "mot:testresults"}).sort({$natrual: -1}).limit(1).pretty()

As you can see in the output, the BI Connector has indeed pushed down to the database, the WHERE clause, plus the projection to return only specific fields. The profiler output shows that this has been achieved by the BI Connector, by assembling an Aggregation Pipeline.

This is great to see. Most of the work to process the SQL query is being done at the database level, reducing the amount of data (less rows and less columns) that is returned to the ODBC client for final processing, and also enabling database indexes to be leveraged for maximum performance.

Song for today: End Come Too Soon by Wild Beasts

Tuesday, December 15, 2015

Accessing MongoDB data using SQL / ODBC on Linux

In my previous post I showed how generic Windows clients can use ODBC to query data from MongoDB using the new BI Connector. I'm not really a Windows type person though and feel more at home with a Linux desktop. Therefore, in this post, I will show how easy it is to use ODBC from Linux (Ubuntu 14.04 in my case) to query MongoDB data using SQL. I've used the same MOT UK car test results data set as my last post.

First of all I needed to install the Linux ODBC packages plus the PostgreSQL ODBC driver from the package repository.

$ sudo apt-get install unixodbc-bin unixodbc odbc-postgresql

Then I googled how to use ODBC and the PostgreSQL driver to query an ODBC data source. Surprisingly, I didn't find much quality information out there. However, looking at the contents of the "odbc-postgresql" package....

$ apt-file list odbc-postgresql showed some bundled documents including...


Upon opening this text file I found pretty much everything I needed to know, to get going. I really should learn to RTFM more often!

The next step, as documented in that README, was to register the PostgreSQL ANSI & Unicode ODBC drivers in the /etc/odbcinst.ini file, using a pre-supplied template file that contained the common settings.

$ sudo odbcinst -i -d -f /usr/share/psqlodbc/odbcinst.ini.template

Then I needed to create the /etc/odbc.ini file where data sources can be registered. Here I used a skeleton template again.

$ sudo cat /usr/share/doc/odbc-postgresql/examples/odbc.ini.template >> ~/.odbc.ini
$ sudo mv .odbc.ini /etc/odbc.ini

However, this particular template only includes dummy configuration settings, so I needed to edit this file to register the remote MongoDB BI Connector's details properly.

$ sudo vi /etc/odbc.ini

$ cat /etc/odbc.ini 
Description         = MOT Test Data
Driver              = PostgreSQL Unicode
Trace               = No
TraceFile           = /tmp/psqlodbc.log
Database            = mot 
Servername          =
UserName            = mot
Password            = mot
Port                = 27032
ReadOnly            = Yes
RowVersioning       = No
ShowSystemTables    = No
ShowOidColumn       = No
FakeOidIndex        = No
ConnSettings        =

( forgive my poorly secure username/password of "mot/mot". ;) )

Now I was ready to launch the "isql" (Interactive SQL) tool, that is bundled with the Linux/UNIX ODBC package, to be able to issue SQL queries against my remote MongoDB BI Connector data source.

$ isql mot mot mot

( first param is data source name, second param is username, third param is password )

As you can see in the screenshot below, using "isql" I was then able to easily issue arbitrary SQL commands against MongoDB and see the results.

And that's it. Very simple, and I now have a powerful command line SQL tool at my disposal for further experiments in the future.  :-)

Song for today: Swallowtail by Wolf Alice

Friday, December 11, 2015

Accessing MongoDB data from SQL / ODBC on Windows, using the new BI Connector

The latest enterprise version of MongoDB (3.2) includes a new BI Connector to enable business intelligence, analytics and reporting tools, that only "speak" SQL, to access data in a MongoDB database, using ODBC. Most of the examples published so far show how to achieve this using rich graphical tools, like Tableau. Therefore, I thought it would be useful to show here that the data is accessible from any type of tool, that is capable of issuing SQL commands via an ODBC driver. Even from Microsoft's venerable Excel spreadsheet application. Believe it or not, I still come across organisations out there that are using Excel to report on the state of their business!

For my example, I loaded a MongoDB database with the anonymised MOT tests results data, that the UK government makes freely available to use. As an explanation for non-residents of the UK, MOT tests are the annual inspections that all UK road-going cars and other vehicles have to go through, to be legal and safe. There are millions of these car test records recorded every year, and they give a fascinating insight into the types and ages of cars people choose to drive in the UK.

First I loaded the CSV file based MOT data sets into a MongoDB 3.2 database, using a small Python script I wrote, with each document representing a test result for a specific owner's car for a specific year. Below is an example of what one test result document looks like in MongoDB:

I then followed the online MongoDB BI Connector documentation to configure a BI Connector server to listen for ODBC requests for the "mot" database and translate these to calls to the underlying MongoDB "testresults" collection. I just used the default DRDL ("Document Relational Definition Language") schema file that was automatically generated by the "mongodrdl" command line utility (a utility bundled with the BI Connector).

Then, on a separate desktop virtual machine running Windows 10, I downloaded the latest PostgreSQL ODBC driver installer for Windows and installed it.

With the ODBC driver installed, I then proceeded to define a Windows ODBC Data Source to reference the MOT database that I was exposing via by the BI Connector (running on a remote machine).

By default the BI Connector (running on a machine with IP address, in my case), listens on port 27032. Before hitting the Save button, I hit the Test button to ensure that the Windows client could make a successful ODBC connection to the BI Connector.

With the new ODBC Data Source now configured (shown in screenshot above), I then launched Microsoft Excel so that I could use this new data source to explore the MOT test data.

Excel's standard query wizard was able to use the ODBC data source to discover the MongoDB collection's "schema". I chose to include all the "fields" in the query.

I thought it would be useful to ask for the MOT test results to be ordered by Test Year, followed by Car Make, followed by Car Model.

Finally, upon pressing OK, Excel presented me with the results of the SQL/ODBC query, run directly against the MOT test data, sourced from the MongoDB collection.

Excel, then gave me many options, including settings to say whether to periodically refresh the data from source, and how often. I was also able to open Microsoft's built-in Query Builder tool to modify the query and execute again

That's pretty much it. It's straight forward to configure a Windows client to access data from MongoDB, via MongoDB's new BI Connector, using ODBC.

Song for today: Dust and Disquiet by Caspian

Friday, September 26, 2014

Tracking Versions in MongoDB

I'm honoured to have been asked by Asya to contribute a guest post to her series on Tracking Versions in MongoDB.

Here's Asya's full series on the topic, which I recommend reading in order:

  1. How to Track Versions with MongoDB
  2. How to Merge Shapes with Aggregation Framework
  3. Best Versions with MongoDB
  4. Further Thoughts on How to Track Versions with MongoDB  (my guest post)

Song for today: Someday I Will Treat You Good by Sparklehorse

Thursday, September 11, 2014

Java Using SSL To Connect to MongoDB With Access Control

In this post I've documented the typical steps you need to enable a Java application to connect to a MongoDB database over SSL. Specifically, I show the so-called "one-way SSL" pattern, where the server is required to present a certificate to the client but the client is not required to present a certificate to the server. Regardless of authentication, communication between client and server is encrypted in both directions. In this example, the client actually authenticates with the database, albeit using a username/password rather than presenting a certificate. In addition, I also show how the client application can run operations against a database that has access control rules defined for it.

Note: Ensure you are running the Enterprise version of MongoDB to be able to configure SSL.

1. Generate Key+Certificate and Configure With MongoDB For SSL

Create a key and certificate:

$ su -
$ cd /etc/ssl/
$ openssl req -new -x509 -days 365 -nodes -out mongodb-cert.crt -keyout mongodb-cert.key
$ cat mongodb-cert.key mongodb-cert.crt > mongodb.pem
$ exit

Add the following entries to mongodb.conf (or equivalent if setting command line options):


Start the mongod database server.

Test the SSL connection from the Mongo shell.

$ mongo --ssl

2. Configure Java Client For SSL

Create a new Java trust store in the local application's root directory, importing the certificate generated from the previous section. This is necessary because in this example a self-signed certificate is used.

$ keytool -import -alias "MongoDB-cert" -file "/etc/ssl/mongodb-cert.crt" -keystore truststore.ts -noprompt -storepass "mypasswd"
$ keytool -list -keystore truststore.ts -storepass mypasswd

In the client application's Java code, add the "ssl=true" parameter to the MongoDB URI to tell it to use an SSL connection, eg:

MongoClientURI uri = new MongoClientURI("mongodb://localhost:27017/test?ssl=true");

Modify the command line or script that runs the 'Java' executable for the client application, and add the following JVM command line property. This will allow the application to validate the server's certificate against the new trust store, when using SSL.

Run the Java client application to test that the SSL connection works.

3. Configure MongoDB database with Access Control Rules

Using the Mongo shell, connect to the database, and define an 'administrator user', plus a 'regular' user with an access control rule defined to enable it to have read/write access to a database (called 'test' in this example).

$ mongo --ssl
> use admin
> db.addUser({user: "admin", pwd: "admin", roles: [ "userAdminAnyDatabase"]})
> use test
> db.addUser({user: "paul", pwd: "password", roles: ["readWrite", "dbAdmin"]})

Add the following entry to mongodb.conf (or equivalent if setting command line options):


Restart the mongod database server to pick up this change.

Test the SSL connection to the 'test' database with username/password authentication, from the Mongo shell.

$ mongo test --ssl -u paul -p password

If the user specified doesn't have permissions to read collections in the database, an error similar to below will occur when trying to run db.mycollection,find(), for example.

error: { "$err" : "not authorized for query on test.mycollection", "code" : 13 }

4. Configure Java Client To Authenticate

Modify the client application code to include the username and password in the URL, eg:

MongoClientURI uri = new MongoClientURI("mongodb://paul:password@localhost:27017/test?ssl=true");

Run the Java client application to test that using SSL with username/password authentication works and has the rights to access the sample database.

If the user specified doesn't have correct permissions, an exception similar to below will occur.

Exception in thread "main" com.mongodb.MongoException: not authorized for query on test.system.namespaces
  at com.mongodb.MongoException.parse(
  at com.mongodb.DBApiLayer$MyCollection.__find(
  at com.mongodb.DBApiLayer$MyCollection.__find(
  at com.mongodb.DB.getCollectionNames(
  at MongoTest.main(

Note: In a real Java application, you would invariably build the URL dynamically, to avoid hard-coding a username and password in clear text in code.

Song for today: My Sister in 94 by The Paradise Motel

Friday, May 2, 2014

MongoDB Connector for Hadoop with Authentication - Quick Tip

If you are using the MongoDB Connector for Hadoop and you have enabled authentication on your MongoDB database (eg. auth=true) you may find that you are prevented from getting data in to or out of the database.

You may have provided the username and password to the connector (eg. mongo.input.uri = "mongodb://myuser:mypassword@host:27017/mytestdb.mycollctn"), for an Hadoop Job that pulls data from the database. The connector will authenticate to the database successfully, but early in in the job run, the job will fail with an error message similar to the following:

14/05/02 13:17:01 ERROR util.MongoTool: Exception while executing job... com.mongodb.hadoop.splitter.SplitFailedException: Unable to calculate input splits: need to login
        at com.mongodb.hadoop.MongoInputFormat.getSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(
        at org.apache.hadoop.mapreduce.Job$

This is because the connector needs to run the MongoDB-internal splitVector DB command, under the covers, to work out how to split the MongoDB data up into sections ready to distribute across the Hadoop Cluster. However, by default, you are unlikely to have given sufficient privileges to the user, used by the connector, to allow this DB command to be run. This issue can be simulated easily by opening a mongo shell against the database, authenticating with your username and password and then running the splitVector command manually. For example:

> var result = db.runCommand({splitVector: 'mytestdb.mycollctn', keyPattern: {_id: 1}, 
                                                         maxChunkSizeBytes: 32000000})
> result
"ok" : 0,
"errmsg" : "not authorized on mytestdb to execute command
    splitVector: "mytestdb.mycollctn", keyPattern: { _id: 1.0 },
    maxChunkSizeBytes: 32000000.0 }",
"code" : 13

To address this issue, you first need to use the mongo shell, authenticated as your administration user, and run the updateUser command to give the connector user the clusterManager role, to enable the connector to run the DB commands it requires. For example:

use mytestdb
db.updateUser("myuser", {
                   roles : [
                      { role: "readWrite", db: "mytestdb" },
                      { role : "clusterManager", db : "admin"  }

After this, your Hadoop jobs with the connector should run fine.

Note: In my test, I ran Cloudera CDH version 5, MongoDB version 2.6 and Connector version 1.2 (built with target set to 'cdh4').

Song for today: Spanish Sahara by Foals

Wednesday, April 9, 2014

Parallelising MongoDB Aggregation on a Sharded Database

In my last blog post, I explored how I could speed up a MapReduce-style aggregation job, by parallelising the work over subsets of a collection, using multiple threads, for a non-sharded MongoDB database. In this post, I look at how I took the same test case, and a similar approach, to see if I could achieve a speed up when aggregating the 10 million documents stored in a sharded MongoDB database.

Confession time....

After my success in speeding up an aggregation on a non-sharded database, I assumed this second phase of my investigation would be a walk in the park and I'd quickly march on to a conclusion exhibiting even greater gains, in a sharded environment. However, things didn't quite turn out that way, and I ended up spending considerably more time on this investigation, than I'd anticipated. More on that later.

First of all though, I needed to insert the 10 million randomly generated documents into, a now, sharded collection deployment (3 shards all running on a single Linux laptop), which of course, was straight-forward.

Identifying Data Subsets In A Sharded Environment

Once the sharded collection was populated, I had to consider what changes to make to my JavaScript/mongo-Shell code, to deal with differences between non-sharded and sharded environments. It turned out that this was one of the more straight-forward and intuitive changes to make.

In the sharded MongoDB environment, I didn't have to take an educated guess on how to split up the data into ranges or use the undocumented and time-consuming splitVector command, as I did for the non-sharded tests. MongoDB is already working hard to manage the shard-key for me and how best to evenly distribute subsets (chunks) across each shard. Therefore all the information I needed was already available to me in the 'config' database, accessible via the mongos router.

For the sharded test collection, a quick query of the config database using the mongo Shell (connected to a mongos), told me how the collection was currently distributed.

mongos> use config
mongos> db.chunks.find({ns: "mrtest.rawdata"}, {_id: 0, shard: 1, min: 1, max: 1}).sort({shard: 1})

{ "min" : { "dim0" : 635504 }, "max" : { "dim0" : 702334 }, "shard" : "s0" }
{ "min" : { "dim0" : 702334 }, "max" : { "dim0" : 728124 }, "shard" : "s0" }
{ "min" : { "dim0" : 728124 }, "max" : { "dim0" : 759681 }, "shard" : "s0" }
{ "min" : { "dim0" : 759681 }, "max" : { "dim0" : 827873 }, "shard" : "s0" }
{ "min" : { "dim0" : 827873 }, "max" : { "dim0" : 856938 }, "shard" : "s0" }
{ "min" : { "dim0" : 856938 }, "max" : { "dim0" : 893410 }, "shard" : "s0" }
{ "min" : { "dim0" : 893410 }, "max" : { "dim0" : 942923 }, "shard" : "s0" }
{ "min" : { "dim0" : 942923 }, "max" : { "dim0" : 999996 }, "shard" : "s0" }

{ "min" : { "dim0" : { "$minKey" : 1 } }, "max" : { "dim0" : 42875 }, "shard" : "s1" }
{ "min" : { "dim0" : 42875 }, "max" : { "dim0" : 91602 }, "shard" : "s1" }
{ "min" : { "dim0" : 91602 }, "max" : { "dim0" : 149588 }, "shard" : "s1" }
{ "min" : { "dim0" : 149588 }, "max" : { "dim0" : 197061 }, "shard" : "s1" }
{ "min" : { "dim0" : 197061 }, "max" : { "dim0" : 257214 }, "shard" : "s1" }
{ "min" : { "dim0" : 471994 }, "max" : { "dim0" : 520681 }, "shard" : "s1" }
{ "min" : { "dim0" : 569205 }, "max" : { "dim0" : 602306 }, "shard" : "s1" }

{ "min" : { "dim0" : 257214 }, "max" : { "dim0" : 305727 }, "shard" : "s2" }
{ "min" : { "dim0" : 305727 }, "max" : { "dim0" : 363357 }, "shard" : "s2" }
{ "min" : { "dim0" : 363357 }, "max" : { "dim0" : 412656 }, "shard" : "s2" }
{ "min" : { "dim0" : 412656 }, "max" : { "dim0" : 471994 }, "shard" : "s2" }
{ "min" : { "dim0" : 520681 }, "max" : { "dim0" : 569205 }, "shard" : "s2" }
{ "min" : { "dim0" : 602306 }, "max" : { "dim0" : 635504 }, "shard" : "s2" }
{ "min" : { "dim0" : 999996 }, "max" : { "dim0" : { "$maxKey" : 1 } }, "shard" : "s2" }

                   (I added a newline between the chunks belonging to each shard, for reader clarity; 
                    also the elements highlighted in bold are discussed in the next section of this post)

So I already had all the information available at my finger tips, which defined an 'even-ish' spread of data-subsets (chunks) of the collection. In this case, I could see there were 22 chunks, spread over 3 shards. So I needed to change my JavaScript code to create multiple threads per shard, each targeting a portion of the chunks belonging to a shard. To build this list of shards and their owned chunks, at runtime, in my JavaScript code I used an aggregation to group by shard.

// Query the config DB, grouping data on all the chunks for each shard
db = db.getSiblingDB('config');
var shardsWithChunks = db.chunks.aggregate([
    {$match: {
        "ns": "mrtest.rawdata"
    {$sort: {
        "min.dim0": 1
    {$group: {
        _id: "$shard",
        chunks: {$push: {
              min: "$min.dim0",
              max: "$max.dim0"

My code then used a loop like below, to cycle through each shard, kicking off multiple sub-aggregations in different threads, where each thread uses a range query that resolves to a portion of chunks currently believed to live on that shard.

shardsWithChunks.forEach(function(shard) {
    // For each new thread, collect together a set of some of the chunks from the
    // shard.chunks array and perform the modified aggregation pipeline on it

Decorating a Pipeline to Enable a Thread to Target Just One Shard

The code I'd written to modify a pipeline, to allow a thread to operate on a subset of data in a non-sharded also had to be changed, to allow for subtleties only present when the data is sharded. Taking the example of wanting to spawn 4 threads per shard, for shard 's1' (shown in the output from db.chunks.find() earlier in this post), I had to divide out the 7 chunks it currently held, between the 4 threads. So 3 of the threads would need to target 2 chunks each, and 1 thread would target the remaining 1 chunk.

Notice that in the list of chunks returned by db.chunks.find() earlier, not all chunks on a particular shard are contiguous. For example, in the results marked in bold, I highlighted that one chunk ends at 257214 on shard 's1', followed by a chunk on 's1' starting at 471994. However, the chunk starting at  257214 is on shard 's2'. This means that the code to modify the aggregation pipeline, needs to query the start and end range for each chunk, rather than just a range from the start of the first chunk to the end of the last chunk. Below you can see the naive, incorrect pipeline that my code could generate, on the left, and the correct pipeline that the code should and will generate, on the right.

// BAD
   {$match: {
      dim0: {
         $gte: 197061,
         $lt : 520680
   {$group: {
      _id: $dim0,
      value: {$sum: 1}
  // GOOD
     {$match: {
        $or: [
              dim0: {
                 $gte: 197061,
                 $lt : 257214
              dim0: {
                 $gte: 471994,
                 $lt : 520681
     {$group: {
        _id: "$dim0",
        value: {$sum: 1}

To decorate the original aggregation pipeline in this way, I needed to modify my original aggregate_subset() JavaScript function, to build up an $or expression of ranges, for the $match part prepended to the pipeline. The fully modified JavaScript code I produced, to run in the mongo Shell and perform all these actions can be viewed in the zip file available here.

It is worth mentioning that, if during the aggregation run, the cluster happens to migrate any chunks, it would not be a big deal. The modified aggregation pipelines would still function correctly and return the correct results. However, a couple of the threads would just take longer to complete, as they would end up being executed against two shards, not one, by mongos and the query optimiser, to collect the requested data.

Sub-optimal mongos Shard Routing For Some Edge-case Range Queries

One thing I noticed when running explain (eg. using db.coll.aggregate(pipeline, {explain: true})), was that even when running the modified pipeline, shown above, which should be targeting chunks in only one shard, mongos actually chooses to target two shards (both 's1' and 's2'). The correct result is still generated, but it just takes a little longer, as twice the amount of shards are hit than necessary. Essentially the query optimiser is not correctly identifying the range boundary as belonging to just one shard, when the range boundary falls exactly at the intersection of two chunks that happen to reside on different shards. Specifically, the problem is for a sharded collection that has a single (non-compound) shard key, and $lte or $gte is used in the range query. I created a JIRA ticket logging this bug, but with a low priority, as functionally an accurate result is returned, and also, it is possible to use a workaround. My original section of code and the modified version, to use this workaround, is shown below.

// mongos/optimiser targets 2 shards
   dim0: {
      $gte: 197061,
      $lt : 257214
  // mongos/optimiser targets 1 shard
     dim0: {
        $gte: 197061,
        $lte: 257213

By modifying the range criteria to be '$lte: 257213', in my JavaScript code, only one shard ('s1') is then correctly routed to. However, this does mean that any generic query code, in a more 'real-world' application, has to be explicitly aware of field types (no explicit database schema) and the knowledge that the field actually contains an integer that can possibly be decremented by 1, to help encourage optimal sharded query routing.

Initial Results

When generating my aggregation test results and capturing the execution time, I first wanted to establish the performance of the normal unchanged aggregation on the sharded database. I felt it would be interesting to see if a regular, unchanged aggregation, ran faster once sharded. When I ran the unchanged aggregation against the sharded database, I was a little surprised to see it execute in 27 seconds. Compare this with the execution time of just 23 seconds when I ran the aggregation against the non-sharded collection. At first I was surprised that the aggregation took longer to complete. Later, upon reflection, I came to realise why, and I will discuss these reasons later, in the conclusion of the blog post. However, for the moment my challenge was just to try to beat the time of 27 seconds, with an my 'optimised parallel aggregations' approach.

I then ran my newly modified JavaScript code (see code in zip file), executing parallel aggregation threads targeted at groups of chunks on each shard (4 threads per shard).  This test executed in 20 seconds. Again I was a little disappointed because this yielded a roughly 25% speed-up in the sharded environment, whereas before, in a non-sharded environment, I was achieving over a 40% speed-up.

Attempts on Improvement

At this point, I started to hypothesise on potential reasons why my 'optimisations' couldn't speed-up an aggregation as quickly for a sharded database. Below are some of the main hypotheses I came up with and what happened when I tested each one.
  1. Is running the test via JavaScript in the mongo Shell introducing a bottleneck?  I re-wrote my application code, to decorate the pipeline and run separate aggregation threads, in Python instead, using the PyMongo driver (also included in the code zip file). However, there was no noticeable change in performance, when re-running the test with the Python code. 
  2. Is the single mongos router, used by all 12 aggregation threads, acting as a bottleneck?  So I started up 12 separate mongos instances and modified the Python version of the code, to force each of the spawned 12 threads to connect to its own dedicated mongos. The execution time that resulted was only marginally better, and so I concluded that a single shared mongos was not really a major bottleneck
  3. Is the role of mongos (ie. shard routing/coordination and partial aggregation processing) the main cause of added latency?  So at this point I went a bit off-piste and did something no-one should never do. I modified the Python code again, but this time to make each spawned thread connect directly to the mongod of the shard, rather than going via mongos. My code achieved this by first querying the 'config' database to find out the direct URL for each shard's mongod host. This time I did see a significant decrease in execution time, down to 15 seconds. This is only 2 seconds off the 13 seconds achieved on the non-sharded database. The 2 second different can probably be explained by the fact that I am spawning 12 threads on a now overloaded host machine, running 3 mongod shards, 1 config mongod database and 1 mongos, versus the non-sharded test that runs just 4 threads against a single mongod. However, running the aggregations directly against shards is a bad thing to do, for two reasons: (1) The $match may miss a chunk, read an orphan chunk or read only part of a split chunk, if any chunk splitting/migration has occurred during the process, resulting in an incorrect result. (2) the aggregation $out operator will cause a collection to be written to the local shard database, but the config databases and all the mongos's will have no awareness of its existence (indeed, I witnessed a 'confused' system after trying this). Therefore, I have not included the code modifications in the code zip file, that I made to pull this stunt, and I vow never to try such a thing again! Still, the results were interesting, even if unusable.  :-)

It is probably worth noting that, for all these test variations, I consistently observed the following characteristics:
  • top would show all the host CPUs running at around 99% for approximately the first 3/4 of the test run.
  • mongostat would show zero page faults (inference being that all the working-set is memory-resident) and only single-figure locked database percentages (ie. sub 10%).
  • iostat would show, at its peak, less than 15% disk usage for the host SSD.

Results Summary

So finally my prolonged theorising and testing came to an end. The table below shows the results I achieved for the different variations, ultimately demonstrating a ~30% performance speed up (27 seconds down to 19 seconds), when using my parallel sub-aggregations optimisation via multiple mongos's against 3 shards. This was less than the 40+% performance increase I achieved in my non-sharded database tests.

Aggregation Description Data-Subsets per Shard Spawned Threads per Shard Number of mongos's Execution Time
Regular unchanged (single-threaded) 0 0 1 27 secs
Serial sub-aggregations (single-threaded) 2 0 1 32 secs
Serial sub-aggregations (single-threaded) 4 0 1 32 secs
Parallel sub-aggregations, single mongos 2 2 1 21 secs
Parallel sub-aggregations, single mongos 4 4 1 20 secs
Parallel sub-aggregations, multiple mongos's 2 2 6 20 secs
Parallel sub-aggregations, multiple mongos's 4 4 12 19 secs
Parallel sub-aggregations, direct to mongod's 2 2 0 16 secs
Parallel sub-aggregations, direct to mongod's 4 4 0 15 secs

For interest, I've included in the table, the results from running the parallel aggregations directly against shard mongod's (red for 'danger'!). However, as discussed earlier, doing this for real in a production system will result in bad things happening. You have been warned!


I've demonstrated that the execution time for certain MapReduce-style Aggregation workloads on a sharded database can be reduced, by parallelising the aggregation work over collection subsets, using multiple threads. However, for this particular test case, the speed-up is less pronounced, when compared to the same test on a non-sharded database.

On reflection, it is probably naive to expect a reduction in aggregation execution time, when moving a collection from a non-sharded database to a sharded one. If you think about the main benefits of sharding, sharding is NOT really about reducing individual operation latency. The main purpose of sharding is to enable a database to be able to scale-out to cope with a significant increase in data volumes and/or a significant increase in read and/or write throughput demand. To that effect, I would expect a sharded database, spread out over multiple machines, to be able to sustain a much higher throughput of concurrent aggregation operations, compared with a non-sharded database confined to a single machine (perhaps that can be a test and blog post for another day!).

Lastly, I also do suspect, with a combination of a much larger data-set, and with the shards, mongos's and client app spread out over separate dedicated host machines, the aggregation test on a sharded database may achieve equality or even beat the execution time compared with a non-sharded database. I even feel this would be the case for an unchanged, non-optimised aggregation pipeline, due to MongoDB's built-in ability to run the first part of a pipeline directly on each shard, in parallel. Unfortunately, time has run out for me to try this at the moment, so perhaps it's something for me to revisit in the future.

Song for today: Perth by Bon Iver

Friday, March 14, 2014

How to speed up MongoDB Aggregation using Parallelisation

A little while ago, Antoine Girbal wrote a great blog post describing how to speed up a MonogDB MapReduce job by 20x. Now that MongoDB version 2.6 is nearly upon us, and prompted by an idea from one of my very smart colleagues, John Page, I thought I'd investigate whether MongoDB Aggregation jobs can be sped up using a similar approach. Here, I will summarise the findings from my investigation.

To re-cap, the original MapReduce tests looked for all the unique values in a collection, counting their occurrences. The following improvements were incrementally applied by Antoine against the 10 million documents in the collection:

  1. Define a 'sort' for the MapReduce job
  2. Use multiple-threads, each operating on a subset of the collection 
  3. Write out result subsets to different databases
  4. Specify that the job should use 'pure JavaScript mode'
  5. Run the job using MongoDB 2.6 (an 'in-development' version) rather than 2.4

With all these optimisation in place, Antoine was able to reduce a 1200 second MapReduce job to just 60 seconds! Very impressive.

I expect that one of the reasons Antoine was focussing on MapReduce in MongoDB, was because MongoDB limited the output size of the faster Aggregation framework to 16 MB. With MongoDB 2.6, this threshold is removed. Aggregations generate multi-document results, rather than a single document and these can be navigated by the client applications using a cursor. Alternatively, an output collection can be specified for the Aggregation framework to write directly to. Thus, version 2.6 presents the opportunity to try the MapReduce test scenario with the Aggregation framework instead, to compare performance. In this blog post I do just that, focussing purely on testing a single, non-sharded database.

Re-running the optimised MapReduce

Before I tried running the test with the Aggregation framework, I thought I'd quickly run and time Antoine's optimised MapReduce job on my test machine to normalise any performance difference that occurs due to hardware disparity (for reference, my host machine is a Linux x86-64 laptop with SSD, 2 cores + hyper-threading, running MongoDB 2.6.0rc1).

I first had to insert the 10 million documents, as Antoine did, using the mongo Shell, with each insert containing a single field (‘dim0’) whose value is a random number between 0 and 1 million:

> for (var i = 0; i < 10000000; ++i) {
      db.rawdata.insert({dim0: Math.floor(Math.random()*1000000)});
> db.rawdata.ensureIndex({dim0: 1})

I then ran Antoine's fully optimised MapReduce job, which completed in 32 seconds, versus 63 seconds on Antoine's host machine.

Running a normal, non-optimised Aggregation

So the first thing I needed to establish is how much faster or slower the equivalent Aggregation job is, compared with the 'optimised' MapReduce job I'd just run on the same hardware. The Aggregation pipeline is actually very simple to implement for this test. Below you can see the MapReduce code and the Aggregation pipeline equivalent:

map: function() {
    emit(this.dim0, 1); 

reduce: function(key, values {
    return Array.sum(values);
   var pipeline = [
       {$group: {
           _id: "$dim0",
           value: {$sum: 1}

The pipeline basically just groups on the field 'dim0', counting the number of occurrences for this value. So I ran this Aggregation against my database, called 'mrtest', containing the random generated 'rawdata' collection:

> db = db.getSiblingDB('mrtest');
> db['rawdata'].aggregate(pipeline);

The aggregation completed in 23 seconds, versus 32 seconds for running the 'optimised' equivalent MapReduce job on the same hardware. This shows just how quick the standard out-of-the-box Aggregation capability is in MongoDB!

Running an optimised Aggregation

So my challenge was clear now: See if I can apply some of Antoine's tricks from speeding up MapReduce, to the Aggregation framework, to reduce this test completion time from 23 seconds.

I considered which of the original 5 optimisations could realistically be applied to optimising the test with the Aggregation framework:

  1. YES - Define a 'sort' for the job
  2. YES - Use multiple-threads, each operating on a subset of the collection 
  3. NO - Write out result subsets to different databases  (the Aggregation framework expects to write out a collection to the same database as the source collection being read)
  4. NO - Specify that the job should use 'pure JavaScript mode'  (doesn't apply here as Aggregations run within C++ code)
  5. NO – Move to using a newer version of MongoDB, 2.4 to 2.6  (we are already at the latest version with my test runs, which is undoubtedly helping achieve the 23 second result already)

So the main thing I needed, in addition to including a 'sort', was to somehow split up the aggregation into parts to be run in parallel, by different threads, on subsets of the data. This would require me to prepend the existing pipeline with operations to match a range of documents in the collection and to sort the matched subset of documents, before allowing the original pipeline operation(s) to be run. Also, I would need to append an operation to the pipeline, to specify that the result should go to a specific named collection. So for example, for one of the subsets of the collection (documents with 'dim0' values ranging from 524915 to 774848), the following table shows how I need to convert the original pipeline:

    {$group: {
        _id: "$dim0",
        value: {$sum: 1}
       {$match: {
           dim0: {
               $gte: 524915,
               $lt : 774849
       {$sort: {
           dim0: 1
       {$group: {
           _id: $dim0,
           value: {$sum: 1}

A similar pipeline, but with different range values and output collection name, needs to be produced for each thread I intend to run. Each of these pipelines need to be invoked with db.coll.aggregate() in parallel, from different threads..

To achieve this in a generic way, from the Shell, I quickly wrote a small JavaScript function to 'decorate' a given pipeline with $match and $sort prepended and $out appended. The function ends by running MongoDB’s aggregate() function with the modified pipeline, against the subset of matching data.

// Define new wrapper aggregate function, to operate on subset of docs 
// in collection after decorating the pipeline with pre and post stages
var aggregateSubset = function(dbName, srcClltName, tgtClltName,
                                  singleMapKeyName, pipeline, min, max) { 

    var newPipeline = JSON.parse(JSON.stringify(pipeline)); //deep copy
    var singleKeyMatch = {};
    singleKeyMatch[singleMapKeyName] = (max < 0) ? {$gte: min} 
                                                 : {$gte: min, $lt: max};
    var singleKeySort = {};
    singleKeySort[singleMapKeyName] = 1;

    // Prepend pipeline with range match and sort stages
        {$match: singleKeyMatch},
        {$sort: singleKeySort}  

    // Append pipeline with output to a named collection
       {$out: tgtClltName}

    // Execute the aggregation on the modified pipeline
    db = db.getSiblingDB(dbName);
    var result = db[srcClltName].aggregate(newPipeline);
    return {outCollection: tgtClltName, subsetResult: result};

This function takes the name of the database, source collection, and target collection, plus the original pipeline, plus the minimum and maximum values for the subset of the collection. The keen-eyed amongst you will have noticed the 'singleMapKeyName' parameter too. Basically, my example function is hard-coded to assume that the pipeline is mapping and sorting data anchored on a single key ('dim0' in my case, or this could be group-by 'customer', as another example). However, in other scenarios, a compound key may be being used (eg. group-by customer + year). My function would need to be enhanced, to be able to support compound keys.

Once I'd completed and tested my new aggregateSubset() JavaScript function. I was ready to run some test code in the Shell, to call this new function from multiple threads. Below is the code I used, specifying that 4 threads should be spawned. My host machine has 4 hardware threads, so this is probably not a bad number to use, even though I actually chose this number because it matched the thread count used in Antoine’s original MapReduce tests.

// Specify test parameter values
var dbName = 'mrtest';
var srcClltName = 'rawdata';
var tgtClltName = 'aggdata';
var singleMapKeyName = 'dim0';
var numThreads = 4;

// Find an even-ish distributed set of sub-ranges in the collection
var singleKeyPattern = {};
singleKeyPattern[singleMapKeyName] = 1;
var splitKeys = db.runCommand({splitVector: dbName+"."+srcClltName, 
      keyPattern: singleKeyPattern, maxChunkSizeBytes: 32000000}).splitKeys;

// There are more sub-ranges than threads to run, so work out 
// how many sub-ranges to use in each thread
var inc = Math.floor(splitKeys.length / numThreads) + 1;
var threads = [];

// Start each thread, to execute aggregation on a subset of data 
for (var i = 0; i < numThreads; ++i) { 
    var min = (i == 0) ? 0 : splitKeys[i * inc].dim0;
    var max = (i * inc + inc >= splitKeys.length) ? -1 
                                                  : splitKeys[i * inc + inc].dim0;
    var t = new ScopedThread(aggregateSubset, dbName, srcClltName,
                 tgtClltName + min, singleMapKeyName, pipeline, min, max);

// Wait for all threads to finish and print out their summaries
for (var i in threads) { 
    var t = threads[i];

This multi-threaded aggregation completed in 13 seconds, versus 23 seconds for running the normal 'non-optimised' version of the aggregation pipeline. This optimised pipeline involved writing to 4 different collections, each named 'aggdataN' (eg. 'aggdata374876').

Okay, this isn't the stunning 20x improvement, which Antoine achieved for MapReduce. However, given that the Aggregation framework is already a highly optimised MongoDB capability, I don't believe a 40% speed up is to be sniffed at. Also, I suspect that on host machines with more cores (hence requiring the thread count parameter used in my example to be increased), and with a much larger real-world data set, even more impressive results would be achieved, such as a 2x or 3x improvement.

Some observations

As you'll notice I use a similar pattern to Antoine's, whereby I use the undocumented splitVector command to get a good idea of what is a well balanced set of ranges in the source collection. This is especially useful if the distribution of values in a collection is uneven and hard to predict. In my case, the distribution is pretty even and for 4 threads I could have got away with hard-coding ranges of 0-250000, 250000-500000, 500000-750000 & 750000-1000000. However, more real world collections will invariably have a much more uneven spread of values for a given key. With a naïve approach to partitioning up the data, one thread could end up doing much more work than the other threads. In real world scenarios, you may have other ways of predicting balanced subset ranges, based on the empirical knowledge you have of your data. You would then use a best guess effort to specify the split points for partitioning your collection. If you do choose to use the splitVector utility for a real application, I would suggest caching the value and only re-evaluating it intermittently, to avoid the latency it would otherwise add to the execution time of your aggregations.

Also, you'll notice I followed Antoine's approach of using the undocumented ScopedThread Shell utility object, to spawn new threads. If you are writing a real application to run an aggregation, you would instead use the native threading API of your chosen programming language/platform, alongside the appropriate MongoDB language driver.

One optimisation I wish I could have made, but couldn't was 'write out result subsets to different databases'. In MongoDB 2.6, one can specify an output collection name, but not an output database name, when calling aggregate(). My belief that different threads outputting to different databases would increase performance, is based on some observations I made during my tests. When running 'top', 'mongostat' and 'iostat' on my host machine whilst the optimised aggregation was running, I initially noticed that ‘top’ was reporting all 4 ‘CPUs’  as maxing out at around 95% CPU utilisation, coming down to a lower amount in the later stages of the run. At the point when CPU usage came down, ‘mongostat’ was reporting that the database 'mrtest' was locked for around 90% of the time. At the same point, disk utilisation was reported by ‘iostat' as being a modest 35%. This tells me that for part of the aggregation run, the various threads are all queuing waiting for the write lock, to be able to insert result documents into their own collections in the same database. By allowing different databases to be used, the write lock bottleneck for such parallel jobs would be diminished considerably, thus allowing the document inserts, and hence, the overall aggregation, to complete even quicker. I have created a JIRA enhancement request, asking for the ability for the Aggregation framework's $out operator to specify a target database, in addition to a target collection.


So I've managed to show that certain MapReduce-style workloads can be executed faster if using Aggregation in MongoDB. I showed the execution time can be reduced even further, by parallelising the aggregation work over subsets of the collection, using multiple threads. I demonstrated this for a single non-sharded database. In the future, I intend to investigate how a sharded database can be used to drive even better parallelisation and hence lower execution times, which I will then blog about.

Song for today: The Rat by The Walkmen

Wednesday, April 3, 2013

Load Balancing T3 InitialContext Retrieval for WebLogic using Oracle Traffic Director



The T3 protocol is WebLogic's fast native binary protocol that is used for most inter-server communication, and by default, communication to a WebLogic server from client applications using RMI, EJB or JMS, for example. If a JMS distributed queue or an RMI/EJB based application is deployed to a WebLogic cluster, for high availability, the client application needs a way of addressing the "virtual endpoint" of the remote clustered service. Subsequent client messages sent to the service then need to be load-balanced and failed-over, when necessary. For WebLogic, the client application achieves this in two phases:

  1. Addressing the virtual endpoint of the clustered service. The client Java code populates a "Provider URL" property with the address of the WebLogic cluster, to enable the "InitialContext" to be bootstrapped. As per the Weblogic JNDI documentation, "this address may be either a DNS host name that maps to multiple IP addresses or a comma separated list of single address host names or IP addresses". Essentially this virtual endpoint URL is only ever used by the client, to connect to a random server in the cluster, for creating the InitialContext. The URL is not used for any further interaction between client and clustered servers and does not influence how any of the subsequent T3 messages are routed.
  2. Load-balancing JNDI/RMI/EJB/JMS messaging to the clustered service. Once the InitialContext is obtained by the client application, all JNDI lookups and subsequent RMI/EJB/JMS remote invocations, use WebLogic generated "cluster-aware" stubs, under the covers. These stubs are populated with the list of clustered managed server direct connection URLs and the stub directly manages load-balancing and failover of T3 requests across the cluster. When a particular server is down, the stub will route new requests to another server, from the cluster list it maintains. Under certain circumstances, such as cases where no servers in the stub's current cluster list are reachable any more, the stub will be dynamically refreshed with a new version of the cluster list, to try. The stub's list of cluster membership URLs is unrelated to the fixed URL that is used for phase 1 above.

For bootstrapping the InitialContext (i.e. phase 1 above), the WebLogic Clustering documentation recommends that, for production environments, a DNS entry containing a list of clustered server addresses is used. This avoids the client application needing to "hard-code" a static set of cluster member addresses. However, sometimes it may be more convenient to use an external load balancer to virtualise this clustered T3 endpoint. In this blog entry, I will examine how the Oracle Traffic Director product can be used to virtualise such a cluster endpoint address, when bootstrapping the InitialContext.

It is not required or even supported to use an External Load Balancer for load-balancing the subsequent JNDI/RMI/EJB/JMS interaction over T3 (i.e phase 2 above). The WebLogic generated stubs already perform this role and are far more adept at it. Additionally, many of these T3 based interactions are stateful, which the stubs are aware of and help manage. So attempting to also introduce an External Load Balancer in the message path will break this stateful interaction. These rules are explicitly stated in the WebLogic EJB/RMI documentation which concludes "when using the t3 protocol with external load balancers, you must ensure that only the initial context request is routed through the load balancers, and that subsequent requests are routed and controlled using WebLogic Server load balancing".

Oracle Traffic Director

Oracle Traffic Director (OTD) is a layer-7 software load-balancer for use on Exalogic. It includes many performance related characteristics, plus high availability features to avoid the load-balancer from being a "single point of failure". OTD has been around for approximately 2 years and has typically been used to virtualise HTTP endpoints for WebLogic deployed Web Applications and Web Services. A new version of OTD has recently been released (see download site + online documentation), that adds some interesting new features, including the ability to load-balance generic TCP based requests. An example use case for this, is to provide a single virtual endpoint to client applications, to access a set of replicated LDAP servers as a single logical LDAP instance. Another example use case, is to virtualise the T3 endpoint for WebLogic client applications to bootstrap the InitialContext, which is precisely what I will demonstrate in the rest of this blog entry.

Configuring OTD to Load Balance the T3 Bootstrap of the InitialContext 

I used my Linux x86-64 laptop to test OTD based TCP load-balancing. I generated and started up a simple Weblogic version 10.3.6 domain containing a cluster of two managed servers with default listen addresses set to "locahost:7001" and "localhost:7011" respectively. I also deployed a simple EJB application targeted to the cluster (see next section).

I then installed OTD version and created a simple OTD admin-server and launched its browser based admin console. In the admin console, I could see a new section ready to list "TCP Proxies", in addition to the existing section to list "Virtual Servers" for HTTP(S) end-points, as shown in the screenshot below.

(click image for larger view)
To create the TCP Proxy configuration I required, to represent a virtual endpoint for my WebLogic cluster, I chose the 'New Configuration' option and was presented with Step 1 of a wizard, as shown in the screenshot below.

(click image for larger view)
In this Step 1, I provided a configuration name and an OS user name to run the listener instance under, and I selected the TCP radio button, rather than the usual HTTP(S) ones. In Step 2 (shown below), I defined the new listener host and port for this TCP Proxy, which in this case I nominated as port 9001 using the loopback address of my laptop.

(click image for larger view)
In Step 3 (shown below), I then provided the details of the listen addresses and ports for my two running WebLogic managed servers that needed to be proxied to, which is listen on localhost:7001 and localhost:7011 respectively.

(click image for larger view)
In the remaining two wizard steps (not shown), I selected the local OTD node to deploy to and reviewed the summary of the proposed configuration before hitting the 'Create Configuration' button. Once created, I went to the "Instances" view in the OTD admin console and hit the "Start/Restart" button to start my configured TCP Proxy up, listening on port 9001 of localhost.

(click image for larger view)
At this point, I assumed that I had correctly configured an OTD TCP Proxy to virtualize a Weblogic clustered T3 InitialContext endpoint, so I then needed to prove it worked....

Example Deployed Stateless Session Bean for the Test

I created a simple stateless session bean (including home interface) and deployed it as an EJB-JAR to Weblogic, targeted to my running cluster. The EJB interface for this is shown below and the implementation simply receives a text message from a client application and prints this message to the system-out, before sending an acknowledgement text message back to the client.

  public interface Example extends EJBObject {
    public String sendReceiveMessage(String msg) throws RemoteException;

Example EJB Client Code for the Test

I then coded a simple standalone Java test client application in a single main class, as shown below, to invoke the remote EJB's sole business method (I've not listed the surrounding class & method definition and try-catch-finally exception handling code, for the sake of brevity).

  Hashtable env = new Hashtable();
  env.put(Context.PROVIDER_URL, "t3://localhost:9001");
  System.out.println("1: Obtaining initial context");
  ctx = new InitialContext(env);
  System.out.println("2: Sleeping for 10 secs having got initial context");
  Thread.sleep(10 * 1000);
  System.out.println("3: Obtaining EJB home interface using JNDI lookup");
  ExampleHome home = (ExampleHome) PortableRemoteObject.narrow(
                 ctx.lookup("ejb.ExampleEJB"), ExampleHome.class);
  System.out.println("4: Sleeping for 10 secs having retrieved EJB home interface");
  Thread.sleep(10 * 1000);
  System.out.println("5: Creating EJB home");
  Example exampleEJB = home.create();
  System.out.println("6: Sleeping for 10 secs having got EJB home");
  Thread.sleep(10 * 1000);
  System.out.println("7: Start of indefinite loop");

  while (true) {
    System.out.println("7a: Calling EJB remote method");
    String msg = exampleEJB.sendReceiveMessage("Hello from client");
    System.out.println("7b: Received response from EJB call: " + msg);
    System.out.println("7c: Sleeping for 10 secs before looping again");
    Thread.sleep(10 * 1000);

I included Java code to bootstrap the InitialContext, perform the JNDI lookup of the EJB home, create the EJB remote client instance and invoke the EJB's sendReceiveMessage() method, which are all shown in bold. Additionally, I included various debug print statements, a few 10 second pauses and a while loop (shown in non-bold) to space out the important client-to-server interactions, for reasons that I'll allude to later.

As can be seen in the code, the Provider URL address I used for the InitialContext was "localhost:9001", which is the OTD TCP Proxy endpoint I'd configured, rather than the direct address of the clustered managed servers (which would have been hard-coded to "localhost:7001,localhost:7011", for example). I compiled this client application using a JDK 1.6 "javac" compiler, and ensured that whenever I ran the client application with a Java 1.6 JRE, I included the relevant WebLogic JAR on the classpath.

Test Runs and WebLogic Configuration Change

With my WebLogic clustered servers running (ports 7001 & 7011), and my OTD TCP Proxy listener running (port 9001), I ran the test client application and hit a problem immediately  In the system-out for one of the managed servers, the following error was shown:

  <02-Apr-2013 15:53:00 o'clock GMT> <Error> <RJVM> <BEA-000572> <The server rejected a connection attempt JVMMessage from:
 '-561770856250762161C:,localhost:7011:MyDemoSystemDomain:MyDemoSystemServer2' to: '0B:[9001,-1,-1,-1,-1,-1,-1]' cmd: 'CMD_IDENTIFY_REQUEST', QOS: '101', responseId: '-1', invokableId: '-1', flags: 'JVMIDs Sent, TX Context Not Sent, 0x1', abbrev offset: '105' probably due to an incorrect firewall configuration or admin command.> 

In the client application's terminal, the following error was subsequently shown:

  javax.naming.CommunicationException [Root exception is t3://localhost:9001: Bootstrap to: localhost/' over: 't3' got an error or timed out]
  Caused by: t3://localhost:9001: Bootstrap to: localhost/' over: 't3' got an error or timed out

After some investigation, I found that the My Oracle Support knowledge base contained a document (ID 860340.1) which defines this expected WebLogic behaviour and provides the solution. By default, WebLogic expects any remote T3 access to have been initiated by the client, using the same port number that the proxied request actually hits a port of the WebLogic server on. In my case, because I was running this all on one machine, the port referenced by the client was port 9001 but the port hit on the Weblogic Server was 7001. As recommended in the knowledge base document, I was able to avoid this benign server-side check from being enforced, by including a JVM-level parameter (see below) in the start-up command line for my WebLogic servers. I then re-started both managed servers in the cluster to pick up this new parameter.


This time, when I ran the client application, it successfully invoked the remote EJB. No errors were shown in the output of the clustered managed servers, and instead the system-out of one the managed servers showed the following, confirming that the EJB had been called.

  class example.ExampleEJB_ivy2sm_Impl EJB received message: Hello from client
    (output repeats indefinitely)

On the client side, the expected output was also logged, as shown below.

  1: Obtaining initial context
  2: Sleeping for 10 secs having got initial context
  3: Obtaining EJB home interface using JNDI lookup
  4: Sleeping for 10 secs having retrieved EJB home interface
  5: Creating EJB home
  6: Sleeping for 10 secs having got EJB home
  7: Start of indefinite loop
  7a: Calling EJB remote method
  7b: Received response from EJB call: Request successfully received by EJB
  7c: Sleeping for 10 secs before looping again
    (output 7a to 7c repeats indefinitely)

Proving OTD is not in the Network Path for T3 Stub Load-Balancing

The reason why my test client code includes 10 second pauses and debug system-out logging, is to enable me to observe what happens when I kill various WebLogic and/or OTD servers, part way through client application test runs.

One of the main scenarios I was interested in was to allow the client to bootstrap the InitalContext via the OTD configured TCP Proxy listening on localhost:9001, then terminate the OTD instance/listener process. OTD would be stopped whilst the client application was still in the first 10 second pause, and before the client application had a chance to perform any JNDI lookup or EJB operations. By doing this I wanted to prove that once a client application bootstraps an InitialContext, the Provider URL (eg. "localhost:9001") would subsequently be ignored and the cluster-aware T3 stub, dynamically generated for the client application, would take over the role of routing messages, directly to the clustered managed servers.

Upon re-running the client and stopping the OTD listener, after the client application logged the message "2: Sleeping for 10 secs having got initial context", it continued to work normally. After 10 seconds, it woke up and correctly performed the JNDI lookup of the EJB home, called the remote EJB home create() method and then called the remote EJB business method, logging each of these steps to system-out on the way. This proved that the InitialContext Provider URL is not used for the JNDI lookups or EJB invocations.

To further explore this, I also re-ran the scenario, after first starting Wireshark to sniff all local loopback network interface related traffic. I inspected the content of the T3 messages in the Wireshark captured network traffic logs. I observed that in some of the T3 messages, the responding managed server was indicating the current cluster address as being "localhost:7001,localhost:7011". Upon terminating the first of the two managed servers in the cluster (whilst the client application was still running in its continuous loop), I observed that subsequent cluster address metadata in the T3 responses from the second managed server, back to the client, showed the current cluster address as just "localhost:7011".

I ran one final test. With the WebLogic cluster of two managed servers running, but the OTD TCP Proxy Listener instance not running, I re-ran the client application again from start to end. As soon as the application started and attempted to bootstrap the InitialContext it failed, as expected, with the following logged to system-out.

  1: Obtaining initial context
  javax.naming.CommunicationException [Root exception is t3://localhost:9001: Destination unreachable; nested exception is: Connection refused; No available router to destination]
at weblogic.jndi.internal.ExceptionTranslator.toNamingException(
at weblogic.jndi.WLInitialContextFactoryDelegate.toNamingException(
at weblogic.jndi.WLInitialContextFactoryDelegate.getInitialContext(
at weblogic.jndi.Environment.getContext(
at weblogic.jndi.Environment.getContext(
at weblogic.jndi.WLInitialContextFactory.getInitialContext(
at javax.naming.spi.NamingManager.getInitialContext(
at javax.naming.InitialContext.getDefaultInitCtx(
at javax.naming.InitialContext.init(
at javax.naming.InitialContext.(
at client.RunEJBClient.main(

This showed that the client application was indeed attempting to bootstrap the InitialContext using the Provider URL of "localhost:9001". In this instance, the TCP proxy endpoint, that I'd configured in OTD, was not available, so the URL was not reachable. This proved that the client application was not magically discovering the cluster address from a cold-start and does indeed rely on a suitable Provider URL being provided, to bootstrap the InitialContext.


In this blog entry, I have shown how an external load balancer can be used to provide a virtualised endpoint for WebLogic client applications to reference, to bootstrap the InitialContext from a WebLogic cluster. In these particular tests, the load balancer product used was the latest version of OTD, with its new "TCP Proxy" capability. However, most of the findings are applicable to environments that employ other types of load balancers, including hardware load balancers.

I have also shown that, other than for InitialContext bootstrapping, an external load balancer will not be used for subsequent T3 load-balancing. Instead, the cluster-aware T3 stubs, that are dynamically loaded into client applications, automatically take on this role.

For production environments, system administrators still have the choice of mapping a DNS hostname to multiple IP addresses, to provide a single logical hostname address representing a WebLogic cluster. However, in some data-centres, it may be more convenient for a system administrator to re-use an existing load balancer technology, that is already in place, to virtualise the endpoint and provide a single logical address for a cluster. This may be the case if it is much quicker for a system administrator to make frequent and on-demand configuration changes to a load balancer, rather than continuously raising tickets with the network team, to update DNS entries.

Song for today: Pace by Sophia