Order By Timestamp issue with Million objects in indexed collection

222 views
Skip to first unread message

Chakra Yadavalli

unread,
Aug 24, 2016, 7:16:26 PM8/24/16
to cqengine-discuss
Hi all,

I am new to CQEngine and trying to get good performance for a query with ORDER BY on timestamp field with 1 million objects. Filtering is fast. It's the ORDER BY that's killing.

public class Job {
 
private long type;  // notnull required field
 
private int priority;  // notnull required field
 
private long timestamp;  // notnull required field milliseconds
 
int status;  // notnull required field


 
// getter setters omitted here -- assume they are available.


 
// All fields have SimpleAttributes declared as static finals as follows: -- omitted here
 
// type - TYPE, priority - PRIORITY, timestamp - TIMESTAMP, status - STATUS.
}


IndexedCollection<Job> jobs = new ConcurrentIndexedCollection<Job>();
jobs
.addIndex(NavigableIndex.onAttribute(TYPE));
jobs
.addIndex(NavigableIndex.onAttribute(PRIORITY));
jobs
.addIndex(NavigableIndex.onAttribute(TIMESTAMP));


QueryOptions queryOptions = queryOptions(orderBy(descending(PRIORITY), ascending(TIMESTAMP)));


// for given STATUS s, and a list of TYPEs t, find all matching jobs
Query<Job> query = and(equal(STATUS, s), in(TYPE, t));
// Retrieve the matching jobs ordered by PRIORITY desc and TIMESTAMP asc.
ResultSet<Job> resultSet = jobs.retrieve(query, queryOptions);


// Iterate on the result set to return Top N jobs.


When I run this with 2 million jobs, it takes few seconds to retrieve top 10 jobs.

When I try the same without the queryOptions, then its works in < 100micros

I tested this with various combinations of one or more of the following but does not have any effect if I pass the queryOptions in retrieve method.

1. Setting queryOptions for TIMESTAMP with Order By Ascending
2. Setting queryOptions for PRIORITY with Order By Descending
3. Setting EngineThresholds.INDEX_ORDERING_SELECTIVITY = 1.0, 0.75. 0.5. 0.25 etc.
4. Tried quantizers on the TIMESTAMP index with compression of 1000 (seconds), 60000 (minutes), 600000 (10 minutes)

I also tried storing the timestamp compressed value in the Job's constructor (using integer division) directly as opposed to using a Quantized index as mentioned above. But no luck. 

I would appreciate any clues on how to make this faster.

Note that the jobs are inserted into indexed collection more or less in the same order timestamp would be sorted. So is there any way to tell the indexed collection to preserve the insertion order like in a linked hash map? I checked the code and I see that internally its a ConcurrentHashMap. If I implement my own ObjectStore with say a ConcurrentLinkedHashMap implementation, then could I remove the ORDER BY clauses and get better performance? I am going to try this with Guava or Caffeine or some other libraries and see how it works.

Are there any other partitioning strategies that would work in this case?

Thanks & Regard,
Chakra

Niall Gallagher

unread,
Aug 24, 2016, 8:18:01 PM8/24/16
to cqengine...@googlegroups.com
Hi Chandra,

It’s a good question, and thanks for providing an example. 
Time-series problems are always fun :)

The main thing which comes to mind when dealing with time-series - as you know - is the index-accelerated ordering strategy.
I note you’ve already tried it by setting INDEX_ORDERING_SELECTIVITY, and that you are using SimpleAttributes which means you should have the required indexes in place for it to work already.

*However* the main problem I see is that you are ordering by PRIORITY, and then by TIMESTAMP.

Index-accelerated ordering only applies to the first attribute in the orderBy clause, which in this case is PRIORITY, and not TIMESTAMP.

And I guess you have a very small number of possible values for PRIORITY, and a large number of possible values for TIMESTAMP? If so then the index on PRIORITY won’t help very much.

So you could try something like the following...
Retrieve the distinct values for priority in descending order from the PRIORITY index, and then retrieve results from the collection in batches in descending order of those priorities.

// Create the collection...
IndexedCollection<Job> jobs = new ConcurrentIndexedCollection<Job>();
// Add indexes on TYPE and TIMESTAMP as normal...
jobs.addIndex(NavigableIndex.onAttribute(TYPE));
jobs.addIndex(NavigableIndex.onAttribute(TIMESTAMP));

// Add an index on PRIORITY, but also keep a reference to it so we can access it directly later...
NavigableIndex<Integer, Job> priorityIndex = NavigableIndex.onAttribute(PRIORITY);
jobs.addIndex(priorityIndex);

QueryOptions queryOptions = queryOptions(
orderBy(ascending(TIMESTAMP)), // order by the time-series attribute only
applyThresholds(threshold(INDEX_ORDERING_SELECTIVITY, 1.0)) // use index-accelerated ordering
);

// Retrieve the distinct values for PRIORITY in descending order from the PRIORITY index...
try (CloseableIterator<Integer> prioritiesDescending = priorityIndex.getDistinctKeysDescending(queryOptions).iterator()) {
while (prioritiesDescending.hasNext()) {
// Retrieve results in batches in descending order of priority....
Query<Job> query = and(equal(STATUS, s), in(TYPE, t), equal(PRIORITY, prioritiesDescending.next()));
ResultSet<Job> resultSet = jobs.retrieve(query, queryOptions); // process this batch
}
}

Another suggestion is that, if you only have a small finite number of priorities, you could also experiment with adding a PartialNavigableIndex for each of the priorities.

Hope that helps!
Niall


--
-- You received this message because you are subscribed to the "cqengine-discuss" group.
http://groups.google.com/group/cqengine-discuss
---
You received this message because you are subscribed to the Google Groups "cqengine-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cqengine-discu...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Niall Gallagher

unread,
Aug 24, 2016, 8:22:42 PM8/24/16
to cqengine...@googlegroups.com
Sorry for the typo I meant to type *Chakra*!


On 25 Aug 2016, at 00:16, Chakra Yadavalli <chakra.y...@gmail.com> wrote:

Chakra Yadavalli

unread,
Aug 24, 2016, 8:51:49 PM8/24/16
to cqengine-discuss
Hi Niall,

Thanks for the quick response. You are right. The distinct values of PRIORITY are very few. So I did try what you suggested -- iterate on the distinct priorities and query with order by only on the TIMESTAMP. It did improve the performance by 10x. 
With ORDER BY PRIORITY DESC, TIMESTAMP ASC each query took about 1500millis
WITH ORDER BY TIMESTAMP ASC and looping over 10 distinct priority values, the query took about 150millis.
However without any ORDER BY the query only takes less than 100micros.

I would like to get the query to 10ms range. Is there anything else we could do? Can the following work? Sorry if its an absurd question. I cloned the CQEngine repository just few hours ago and going over the implementation. :-) Changing the collection backing the indexed collection makes any difference in sorting or can we avoid sorting?

Note that the jobs are inserted into indexed collection more or less in the same order timestamp would be sorted. So is there any way to tell the indexed collection to preserve the insertion order like in a linked hash map? I checked the code and I see that internally its aConcurrentHashMap. If I implement my own ObjectStore with say a ConcurrentLinkedHashMap implementation, then could I remove the ORDER BY clauses and get better performance? I am going to try this with Guava or Caffeine or some other libraries and see how it works.

Niall Gallagher

unread,
Aug 25, 2016, 5:17:31 AM8/25/16
to cqengine...@googlegroups.com
Hi Chakra,

I don’t think that changing the backing collection will make much difference. But you can tune the index configuration.

Basically at the moment, when the TIMESTAMP index is used to accelerate ordering, the engine will find objects like this:

t1 -> {p4, s1}
t2 -> {p3, s5}
t3 -> {p3, s4}
.. etc.

It can quickly locate objects in ascending or descending order of timestamp BUT it will still have to perform filtering to filter out the objects which don’t have the desired priority or status values each time.

So I think the next step then, if you have 10 priority values, is to add a PartialNavigableIndex on attribute TIMESTAMP with filterQuery equal(PRIORITY, p) for each of the 10 priority values.

Partial index on TIMESTAMP for priority 3
t2 -> {s5}
t3 -> {s4}
.. etc.

Partial index on TIMESTAMP for priority 4
t1 -> {s1}
.. etc.

This way, it will avoid having to filter by priority values. If the 10 priority values are evenly distributed in the collection then this should make it 10 times faster (YMMV).

Do you have many distinct values for status as well? If there are not many, you could consider having a partial index for each of the distinct *combinations* of priority AND status with filterQuery: and(equal(PRIORITY, p), equal(STATUS, s))
Essentially, that would eliminate all of the unnecessary filtering.

HTH,
Niall

Chakra Yadavalli

unread,
Aug 28, 2016, 1:35:39 PM8/28/16
to cqengine-discuss
Thanks Niall.

Yes, the unique combinations of PRIORITY and STATUS are between 20 and 30. How do I create a PartialNavigableIndex on two fields? Should I be using a CompoundAttribute? A code snippet will really help.
Couple of more questions: 
1. Once the Job is picked up, I will be updating its STATUS value. Should I be updating the index at that time? using IndexedCollection.update(Iterable<JobAssignment> objectsToRemove, Iterable<JobAssignment> objectsToAdd)? 
2. Is there any cost to deleting the objects from IndexedCollection? I am presently deleting the objects from one IndexedCollection and adding it to another when the STATUS changes. Is this correct way to do it?
3. At present only one thread is accessing the IndexedCollection. What are the effects of using multiple threads to access the IndexedCollection is there any risk of dirty-reads by some of the threads?

Chakra Yadavalli

unread,
Aug 28, 2016, 1:46:11 PM8/28/16
to cqengine-discuss
Hi Niall,

One more thing... The  unique values of PRIORITY, STATUS and TYPE as of today are 10, 3 and 500. However the values of TYPE could increase over time but the use case if infrequent. May be 10 more TYPEs will be added in 6 months. Give this is there anything that could be done to improve the query speed? Also, would a StandingQueryIndex help here? 

Niall Gallagher

unread,
Aug 28, 2016, 7:52:53 PM8/28/16
to cqengine...@googlegroups.com
Hi Chakra,

You can take a look at JavaDocs for PartialNavigableIndex and PartialIndex for some more info.

Long story short, you need something like this:
// Do this for all combinations of s and p...
jobs.addIndex(PartialNavigableIndex.onAttributeWithFilterQuery(TIMESTAMP, and(equal(STATUS, s), equal(PRIORITY, p))));
— 
…once that’s done, then whenever CQEngine sees a query such as "and(equal(STATUS, 5), equal(PRIORITY, 6))” it will use the relevant PartialNavigableIndex to accelerate ordering by TIMESTAMP.

Re (1): Just to clarify: you should not change the values of fields in your objects. Instead you should swap the old version of the object for a replacement object - using the update() method. 
Re (2): That should be fine. Removing objects from the collection does not have a specific performance penalty.
Re (3): There are three implementations of IndexedCollection, each with different concurrency/transaction isolation support. I guess you are using the Concurrent one. Read their JavaDocs for details. If you are concerned about transaction isolation, you should use the Transactional one.

HTH!
Niall

Niall Gallagher

unread,
Aug 28, 2016, 8:34:51 PM8/28/16
to cqengine...@googlegroups.com
Hi Chakra,

A StandingQueryIndex won’t help much with time-series, because it can’t accelerate the ordering of results.
However a PartialIndex is kind of an extension of the StandingQueryIndex concept and it can order results, which is useful for time-series.

To use database analogies…
  • I don’t think there is an exact database analogy for what a StandingQueryIndex is. But you can think of it as a canned response to a query, or to a fragment of a query. The results in this canned response are unsorted.
  • A PartialIndex is somewhat like a materialised view in that it is built on a query. Except that the view only contains a single column which contains foreign keys pointing to the rows in the original table which match the query. And that column is stored in sorted order of the attribute the PartialIndex is built on. So then at runtime if you had query like that, and results needed to be ordered by the attribute, you could write an SQL query to read foreign keys in sorted order from the materialised view and JOIN back with the main table. This is what CQEngine does.

To build optimal indexes (which don't incur any filtering) to accelerate time-series queries, on various subsets of your data (in this case combinations of status and priority), you basically need a different index for each subset. So your only option is PartialIndexes. Welcome to the fun of partitioned time-series queries!

Re: new TYPEs getting added over time: If new TYPEs will be added over time, then you will need to add more partial indexes to account for the new TYPEs over time. At startup (or whenever) if you add a regular NavigableIndex on TYPE though, before you add the partial indexes, then you can call typeIndex.getDistinctKeys() to find out what partial indexes or combinations thereof, you need to add subsequently. So you app could dynamically add the required indexes as the dataset changes.

HTH!
Niall

Chakra Yadavalli

unread,
Aug 29, 2016, 7:41:42 PM8/29/16
to cqengine-discuss
Thanks Niall for the detailed responses. I will try out partial indexes and see how it goes.
Reply all
Reply to author
Forward
0 new messages