Financial price data aggregation performance optimisation

133 views
Skip to first unread message

Alexandros Ioannou

unread,
Feb 13, 2015, 9:50:48 AM2/13/15
to mongod...@googlegroups.com
Hello,

Story:
We are using mongodb 2.6.0 on a windows server 2008, intel zeon cpu e3-1230v3 3.3ghz and 16gb of ram with normal hard drives (non-ssd) in replica set with 2 other machines (3 in total) same version of mongodb. One windows server 2008 and another ubuntu 14.04 same specs.
We have 3 collections for storing the price data for 600 financial instruments. One collections stores the minutes, another collection stores the hours and the third collection stores the days. The hours and days collections are fairly small and perform very well. However, the minutes collection, now around 11gb with one third of the financial instruments having history as old as 2010, suffers when performing aggregation. In all three collections, we store the data pre-aggregated to the specified period. In days collection, each document represents a day with the instrument id, open, high, low, close of the day, an ISODate and of course the mongodb _id. The same with hours and minutes. In hours collection each document represents an hour and in minutes collection, each document is a minute. Using these three collections, we can perform aggregation to create "bigger" aggregated periods (open, high low, close values for longer periods). For example, we use the minutes collection to create periods from 1 minute (basically return the documents as they are) to 59 minutes. For hours, we can do from 1 hours to 4 hours and for days 1-29 days, 1-2 weeks and 1 month. 

We store new documents in these collections, usually, when the period is elapsed. This means that in the minutes collection we insert around 600 documents every minute, in the hours collection around 600 documents every hour and in the days collection around 600 every midnight. Initially, we were using a different collection for each financial instrument, we had only 24, but when we added the rest of the instruments we noticed that the database became slow in inserting new documents. Maybe because our server had to perform more round trip because of the 600 collections, in contrast to now that we do a single multiple insert.

Problem:
When performing aggregation of, lets say 59 minutes, we basically aggregate 59 days worth of minutes. In a day there are around 1440 minutes so for 59 days we read around 84,960 documents. Running the aggregation for the first time takes from half a minute to a whole minute. The consecutive runs are very fast. During the first aggregate I can see in the windows task manager that the memory of the mongod process is increasing slowly around 20 mb and after it is finished it drops back. On consecutive runs it increases but instantly and falls back very fast as the aggregation finishes much faster.

I understand that running for the first time a long aggregation can take more time since mongodb does not have these documents in memory, however it seems that its taking quite a lot of time.

Suggestions?

Questions: 
- Does the way we store the price documents affect the time the first aggregation takes? Saving all the financial instruments at the same time this makes the price history of a single instrument not consecutive, in a naive storing way. Is this something that we have to consider or mongodb does not suffer from such issue?
- Using mongodb 2.6+ means that all our collections are being stored on disk using usePowerOf2Sizes, I think, however we almost never do any manipulation in these thee collections, will changing to Exact fit allocation help their performance in any way(insert/read)?

Thanks in advance!



Thomas Mølgaard Hjorslev

unread,
Feb 16, 2015, 8:24:57 AM2/16/15
to mongod...@googlegroups.com
Hi Alexandros

Have you read this blog post: http://blog.mongodb.org/post/65517193370/schema-design-for-time-series-data-in-mongodb ?

I don't know your data model, but from your description, it sounds like you store one document per observation.

I was faced with a similar challenge about a year ago, and found the post to be a good inspiration.

/Thomas

Alexandros Ioannou

unread,
Feb 17, 2015, 3:15:19 AM2/17/15
to mongod...@googlegroups.com
Hello Thomas,

Thank you for the reply!

When we were designing our price collections schema we did test this approach too. However, due to the fact that we perform aggregation on these documents to create longer period documents and the periods are up to the user with lots of options, we found that using $unwind in aggregate was slower than aggregating with our current schema (document per minute/hour/day). The issue we are facing is when we try to aggregate old data that are, as it seems, outside the working set. That's why my questions, I think, are oriented on how we can improve mongod from reading data from the disk.

Thanks in advance!

Asya Kamsky

unread,
Feb 18, 2015, 10:52:16 AM2/18/15
to mongod...@googlegroups.com
Your schema sounds fine for your pattern of access.

Two issues: what are your indexes, and why are you on 2.6.0 rather than the latest 2.6 (.7 today thought .8 is about to come out).   You are missing a lot of bug fixing...

Asya
--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: http://www.mongodb.org/about/support/.
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at http://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/8b8a988d-75ba-45d9-8884-17837f4739ef%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
{ "name" : "Asya Kamsky",
  "place" : [ "New York", "Palo Alto", "Everywhere else" ],
  "email" : "as...@mongodb.com",
  "blog" : "http://www.askasya.com/",
  "twitter": "@asya999" }

Alexandros Ioannou

unread,
Feb 18, 2015, 11:03:35 AM2/18/15
to mongod...@googlegroups.com
Hello Asya,

Thank you for the reply!

Our, current, document looks like this:
{
    "_id" : ObjectId("535e32ff874189a3cb48f13a"),
    "time" : ISODate("2014-04-28T00:00:00.000Z"),
    "open" : NumberLong(92813),
    "high" : NumberLong(92865),
    "low" : NumberLong(92444),
    "close" : NumberLong(92579),
    "volume" : NumberLong(13216),
    "instrument" : NumberLong(1)
}
and index:
{
    "instrument" : 1,
    "time" : 1
}
and
{
    "time" : 1
}

I say current because we are thinking to change it to:
{
    "_id" : ObjectId("535e32ff874189a3cb48f13a"),
    "t" : ISODate("2014-04-28T00:00:00.000Z"),
    "o" : NumberLong(92813),
    "h" : NumberLong(92865),
    "l" : NumberLong(92444),
    "c" : NumberLong(92579),
    "v" : NumberLong(13216),
    "i" : NumberLong(1)
}
as it seems that we will save almost 50% of storage (Does this means that will make queries faster since less data has to be read from the hard drive, if not cached already?)

And we will change the index to:
{
    "t" : 1,
    "i" : 1
}
in order to take advantage of index prefixing and to make writes faster (since we will generate one new index instead of two?)

We did not update to the latest mongodb version because we didn't see something in your jira that we were suffering from. We are really excited about the new 3.0 release though and are eager to try it out.

Thanks in advance!

On Wednesday, February 18, 2015 at 5:52:16 PM UTC+2, Asya Kamsky wrote:
Your schema sounds fine for your pattern of access.

Two issues: what are your indexes, and why are you on 2.6.0 rather than the latest 2.6 (.7 today thought .8 is about to come out).   You are missing a lot of bug fixing...

Asya
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.

Asya Kamsky

unread,
Feb 18, 2015, 3:21:04 PM2/18/15
to mongodb-user
A few comments:

> We did not update to the latest mongodb version because we didn't see something in your jira that we were suffering from.

How do you know?  :)   There were over 250 bugs fixed for 2.6.[1-8]...

> [ changing the schema ] as it seems that we will save almost 50% of storage
> (Does this means that will make queries faster since less data has to be read from the hard drive, if not cached already?)

First, let's consider that the storage that it saves you will be a pretty small part of your total "storage" - only documents themselves will be smaller, and they are only a part of the story - the more important part are indexes, and they will be exactly the same size regardless of the document size (field names are not stored inside the indexes, just values as every entry has the same field names).  You are on 2.6 so you are correct that turning off powerOfTwo sizes can help you since it's meant to optimize situations where you change sizes of documents over time, which you don't do.

Let's consider your aggregation instead.  What does your aggregation pipeline look like?   Is it filtering unneeded data as early as possible in the pipeline?  Is it only the minimum necessary stages (with no useless work)?   Can you post an example of an aggregation that takes a long time (and note that if the data and/or indexes it needs aren't in RAM there will always be a first run that's slower, I'm interested in both first and subsequent runs).

Asya



To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.

To post to this group, send email to mongod...@googlegroups.com.
Visit this group at http://groups.google.com/group/mongodb-user.

For more options, visit https://groups.google.com/d/optout.

Alexandros Ioannou

unread,
Feb 24, 2015, 12:01:54 PM2/24/15
to mongod...@googlegroups.com
Hello,

Well, as I mentioned before, we check the jira and the change log, in any case we are testing our application against the latest mongodb so we can update our production soon

After making changes to the index from:
{
    "instrument" : 1,
    "time" : 1
}
to
{
    "time" : 1,
   "instrument" : 1
}
we notice a huge performance hit so we will keep the current index.


The first run of the aggregation is always slow, on the current production that is using 2.6.0 mongod and normal spinning disk it varies from 5 to 40 seconds, consecutive runs are always in the range of 700 milliseconds.
On the test machine that we have setup locally running 2.6.7 mongod and a solid state disk the first run varies from 3 to 20 seconds, consecutive runs are always in the range of 700 milliseconds, same with the production.

I do not think that there is an issue with the aggregation, since we have a problem with the first run and not consecutive ones. Of course if, 700 milliseconds to aggregate 85,000 documents is slow then I will gladly post our aggregation code here so we can get some feedback. 

So I think our main question is how do we improve the initial aggregation?

Asya Kamsky

unread,
Feb 27, 2015, 5:04:31 PM2/27/15
to mongodb-user
I wouldn't expect that index change to help at all... but I haven't
seen still your aggregation pipeline - did I miss it?

Hard to say without knowing what it's doing why it may be slow.

Asya


On Tue, Feb 24, 2015 at 9:01 AM, Alexandros Ioannou
> https://groups.google.com/d/msgid/mongodb-user/0a481fdd-2bcc-4ca5-b6a0-74008c2e81c5%40googlegroups.com.

Alexandros Ioannou

unread,
Mar 2, 2015, 3:28:48 AM3/2/15
to mongod...@googlegroups.com
Hello Asya,

Our aggregation pipeline looks like this:
Please note that this aggregation is to aggregate 59 minute bars and that's why we limit to 84961 (59 days * 1440 minutes in a day)
pipeline : [{
$match : {
instrument : 1,
time : {
$lt : new Date(7260876000000)
}
}
}, {
$sort : {
time : -1
}
}, {
$limit : 84961
}, {
$project : {
open : 1,
high : 1,
low : 1,
close : 1,
volume : 1,
time : 1,
day : {
$dayOfYear : "$time"
},
year : {
$year : "$time"
},
bar : {
$divide : [{
$subtract : [{
$add : [{
$multiply : [{
$hour : "$time"
}, 60]
}, {
$minute : "$time"
}
]
}, {
$mod : [{
$add : [{
$multiply : [{
$hour : "$time"
}, 60]
}, {
$minute : "$time"
}
]
}, 59]
}
]
}, 59]
}
}
}, {
$sort : {
time : 1
}
}, {
$group : {
_id : {
year : "$year",
day : "$day",
bar : "$bar"
},
open : {
$first : "$open"
},
high : {
$max : "$high"
},
low : {
$min : "$low"
},
close : {
$last : "$close"
},
volume : {
$sum : "$volume"
},
time : {
$first : "$time"
}
}
}, {
$sort : {
"_id.year" : 1,
"_id.day" : 1,
"_id.bar" : 1
}
}, {
$group : {
_id : 0,
open_array : {
$push : "$open"
},
high_array : {
$push : "$high"
},
low_array : {
$push : "$low"
},
close_array : {
$push : "$close"
},
time_array : {
$push : "$time"
},
volume_array : {
$push : "$volume"
}
}
}
]

Thanks in advance!

Asya Kamsky

unread,
Mar 2, 2015, 7:10:15 PM3/2/15
to mongodb-user
I may be missing something here - you are taking minute granularity
data and you seem to be splitting into into bars/blocks of 59-minute
chunks - is that what you intend to do? I'm not sure exactly what
this is for - in any case, you can see that the index {
"instrument":1, time:1} would be used for the first three stages
(filter it to a single instrument with most recent 84961 entries).

The rest of your pipeline can be simplified significantly, I'm not
actually sure why you are doing some of the stages - in particular the
extra sorts, and the last group, but regardless of that, you can test
and see how long the first stage of aggregation takes - that's what is
fetching documents from disk if they are not in memory already.

Run this:

db.collection.find({instrument : 1,time : { $lt : new
Date(7260876000000) } } ).sort : ( { time : -1 }
).limit(84961).explain()

(Please substitute appropriate values for collectionname and instrument name).

Running it a couple of times (with explain) for a particular
instrument will show you how much time is being spent fetching data
from disk and how much doing the query). It should show the index
you have being used.

Asya


On Mon, Mar 2, 2015 at 12:28 AM, Alexandros Ioannou
> https://groups.google.com/d/msgid/mongodb-user/0bf4a360-2c7f-4ada-bafa-34baded2735e%40googlegroups.com.

Alexandros Ioannou

unread,
Mar 3, 2015, 4:45:16 AM3/3/15
to mongod...@googlegroups.com
Hello Asya,

The logic is this: having in the database 1 minute aggregated prices (open, high, low, close prices, volume and time) we can use the above aggregation to create the 59 minute period documents, that means that we create documents that have the open, high, low, close price and volume of a 59 minute period. Because the client application expects that these values to be delivered in blocks we have decided to use a days worth of minutes as a block, that's 1440 minutes(documents). This means that for the client application to display 5 days of 1 minute data it has to make 5 requests. For 1 minute period there is no need to aggregate since the data is already pre-aggregated into 1 minutes documents in the minutes collection. But for 2 - 59 minute periods we use more days worth of documents to aggregate in order to keep the resulting "block" approximately always the same size, so for 5 minute periods we use 5 days worth of 1 minute documents, that's 7200 (5*1440) documents, for 30 minute period we use 43200 (30*1440) and for 59 we use 84960 (59 * 1440).

Here is the pipeline explained:

The first match essentially tries to find all the documents of the particular instrument that are before the specified date, then we sort them by time -1 and them limit so that we get the amount of documents we need.

pipeline : [{
$match : {
instrument : 1,
time : {
$lt : new Date(7260876000000)
}
}
}, {
$sort : {
time : -1
}
}, {
$limit : 84961 //Get most recent documents of the instrument for the specified time
}, {
$project : { //Get keys for grouping by 59-minute documents
$sort : {        //sort before grouping because of $first and $last operations
time : 1
}
}, {
$group : {        //create 59-minute documents
_id : {
year : "$year",
day : "$day",
bar : "$bar"
},
open : {
$first : "$open"
},
high : {
$max : "$high"
},
low : {
$min : "$low"
},
close : {
$last : "$close"
},
volume : {
$sum : "$volume"
},
time : {
$first : "$time"
}
}
}, {
$sort : { //sort before grouping because we need correct order in result arrays
"_id.year" : 1,
"_id.day" : 1,
"_id.bar" : 1
}
}, {
$group : {      //the client gets results in that format
_id : 0,
open_array : {
$push : "$open"
},
high_array : {
$push : "$high"
},
low_array : {
$push : "$low"
},
close_array : {
$push : "$close"
},
time_array : {
$push : "$time"
},
volume_array : {
$push : "$volume"
}
}
}
]

How can the aggregation be improved?

Here are the results when running: 
db.collection.find({instrument : 20,time : { $lt : new 

Date(7260876000000) } } ).sort : ( { time : -1 } 
).limit(84961).explain() 

The numbers that differ greatly from the first and the consecutive queries are the nYields and the needFetch fields. nYields as its explain in the online documentation "nYields is a number that reflects the number of times this query yielded the read lock to allow waiting writes to execute." which we are having trouble understanding exactly what does it mean, can you please explain it, and needFetch is not documented, or at least I can't seem to find its documentation.

Can you spot any other issues?

/* First run */
{
    "cursor" : "BtreeCursor instrument_1_time_1 reverse",
    "isMultiKey" : false,
    "n" : 84961,
    "nscannedObjects" : 84961,
    "nscanned" : 84962,
    "nscannedObjectsAllPlans" : 85063,
    "nscannedAllPlans" : 85065,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 7022,
    "nChunkSkips" : 0,
    "millis" : 7562,
    "indexBounds" : {
        "instrument" : [ 
            [ 
                20, 
                20
            ]
        ],
        "time" : [ 
            [ 
                ISODate("2200-02-01T22:00:00.000Z"), 
                true
            ]
        ]
    },
    "server" : "loft4161:27017",
    "filterSet" : false,
    "stats" : {
        "type" : "LIMIT",
        "works" : 91655,
        "yields" : 7022,
        "unyields" : 7022,
        "invalidates" : 0,
        "advanced" : 84961,
        "needTime" : 0,
        "needFetch" : 6694,
        "isEOF" : 1,
        "children" : [ 
            {
                "type" : "FETCH",
                "works" : 91655,
                "yields" : 7022,
                "unyields" : 7022,
                "invalidates" : 0,
                "advanced" : 84961,
                "needTime" : 0,
                "needFetch" : 6694,
                "isEOF" : 0,
                "alreadyHasObj" : 0,
                "forcedFetches" : 0,
                "matchTested" : 0,
                "children" : [ 
                    {
                        "type" : "IXSCAN",
                        "works" : 84961,
                        "yields" : 7022,
                        "unyields" : 7022,
                        "invalidates" : 0,
                        "advanced" : 84961,
                        "needTime" : 0,
                        "needFetch" : 0,
                        "isEOF" : 0,
                        "keyPattern" : "{ instrument: 1, time: 1 }",
                        "boundsVerbose" : "field #0['instrument']: [20.0, 20.0], field #1['time']: (new Date(7260876000000), true)",
                        "isMultiKey" : 0,
                        "yieldMovedCursor" : 0,
                        "dupsTested" : 0,
                        "dupsDropped" : 0,
                        "seenInvalidated" : 0,
                        "matchTested" : 0,
                        "keysExamined" : 84962,
                        "children" : []
                    }
                ]
            }
        ]
    }
}

/* Second run */
{
    "cursor" : "BtreeCursor instrument_1_time_1 reverse",
    "isMultiKey" : false,
    "n" : 84961,
    "nscannedObjects" : 84961,
    "nscanned" : 84962,
    "nscannedObjectsAllPlans" : 85062,
    "nscannedAllPlans" : 85064,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 1909,
    "nChunkSkips" : 0,
    "millis" : 200, <-- best time here was 129
    "indexBounds" : {
        "instrument" : [ 
            [ 
                20, 
                20
            ]
        ],
        "time" : [ 
            [ 
                ISODate("2200-02-01T22:00:00.000Z"), 
                true
            ]
        ]
    },
    "server" : "loft4161:27017",
    "filterSet" : false,
    "stats" : {
        "type" : "LIMIT",
        "works" : 86271,
        "yields" : 1909,
        "unyields" : 1909,
        "invalidates" : 0,
        "advanced" : 84961,
        "needTime" : 0,
        "needFetch" : 1310,
        "isEOF" : 1,
        "children" : [ 
            {
                "type" : "FETCH",
                "works" : 86271,
                "yields" : 1909,
                "unyields" : 1909,
                "invalidates" : 0,
                "advanced" : 84961,
                "needTime" : 0,
                "needFetch" : 1310,
                "isEOF" : 0,
                "alreadyHasObj" : 0,
                "forcedFetches" : 0,
                "matchTested" : 0,
                "children" : [ 
                    {
                        "type" : "IXSCAN",
                        "works" : 84961,
                        "yields" : 1909,
                        "unyields" : 1909,
                        "invalidates" : 0,
                        "advanced" : 84961,
                        "needTime" : 0,
                        "needFetch" : 0,
                        "isEOF" : 0,
                        "keyPattern" : "{ instrument: 1, time: 1 }",
                        "boundsVerbose" : "field #0['instrument']: [20.0, 20.0], field #1['time']: (new Date(7260876000000), true)",
                        "isMultiKey" : 0,
                        "yieldMovedCursor" : 0,
                        "dupsTested" : 0,
                        "dupsDropped" : 0,
                        "seenInvalidated" : 0,
                        "matchTested" : 0,
                        "keysExamined" : 84962,
                        "children" : []
                    }
                ]
            }
        ]
    }
}

/* n consecutive run */
{
    "cursor" : "BtreeCursor instrument_1_time_1 reverse",
    "isMultiKey" : false,
    "n" : 84961,
    "nscannedObjects" : 84961,
    "nscanned" : 84962,
    "nscannedObjectsAllPlans" : 85062,
    "nscannedAllPlans" : 85064,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 663,
    "nChunkSkips" : 0,
    "millis" : 141,
    "indexBounds" : {
        "instrument" : [ 
            [ 
                20, 
                20
            ]
        ],
        "time" : [ 
            [ 
                ISODate("2200-02-01T22:00:00.000Z"), 
                true
            ]
        ]
    },
    "server" : "loft4161:27017",
    "filterSet" : false,
    "stats" : {
        "type" : "LIMIT",
        "works" : 84961,
        "yields" : 663,
        "unyields" : 663,
        "invalidates" : 0,
        "advanced" : 84961,
        "needTime" : 0,
        "needFetch" : 0,
        "isEOF" : 1,
        "children" : [ 
            {
                "type" : "FETCH",
                "works" : 84961,
                "yields" : 663,
                "unyields" : 663,
                "invalidates" : 0,
                "advanced" : 84961,
                "needTime" : 0,
                "needFetch" : 0,
                "isEOF" : 0,
                "alreadyHasObj" : 0,
                "forcedFetches" : 0,
                "matchTested" : 0,
                "children" : [ 
                    {
                        "type" : "IXSCAN",
                        "works" : 84961,
                        "yields" : 663,
                        "unyields" : 663,
                        "invalidates" : 0,
                        "advanced" : 84961,
                        "needTime" : 0,
                        "needFetch" : 0,
                        "isEOF" : 0,
                        "keyPattern" : "{ instrument: 1, time: 1 }",
                        "boundsVerbose" : "field #0['instrument']: [20.0, 20.0], field #1['time']: (new Date(7260876000000), true)",
                        "isMultiKey" : 0,
                        "yieldMovedCursor" : 0,
                        "dupsTested" : 0,
                        "dupsDropped" : 0,
                        "seenInvalidated" : 0,
                        "matchTested" : 0,
                        "keysExamined" : 84962,
                        "children" : []
                    }
                ]
            }
        ]
    }
}


Thanks in advance!

Asya Kamsky

unread,
Mar 7, 2015, 4:44:52 PM3/7/15
to mongodb-user
Two things I'm still really unclear about - why 59-minute periods?
Why not one hour? What happens to the extra minute - for some reason
that really bugs me that I don't understand this :)

The other is: "new Date(7260876000000)" you are saying that you get
all data points before particular date, but your date boundary is in
the year 2200 so it's not actually reducing your data set at all!

Since you are fetching a large set into memory, it appears that the
data gets displaced from RAM during normal operations and doing a new
aggregation needs to page it back in again - I don't see how you can
avoid it other than limiting your data set to smaller chunks, or
getting more RAM.

This part shows you that the index is being used optimally:
"n" : 84961,
"nscannedObjects" : 84961,
"nscanned" : 84962,

If you need to process 84,961 documents, that's how many have to get
fetched, and there is no way around that...

Asya

On Tue, Mar 3, 2015 at 4:45 AM, Alexandros Ioannou
> https://groups.google.com/d/msgid/mongodb-user/38044bce-928a-4bf5-80d2-2ad9bc1adb90%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages