Distributed CQEngine

489 views
Skip to first unread message

Winarto Zhao

unread,
Apr 15, 2014, 12:56:20 AM4/15/14
to cqengine...@googlegroups.com
My project is currently using CQEngine and it's been performing really great and super fast. 
However as with any other in-memory query engine, there is a limit where CQEngine can hold the data, which is limitation in the allocated heap-size to JVM as well as the physical memory in the machine. To overcome this, I'm trying to create an implementation of distributed CQEngine where the underlying Set object is distributed across multiple JVMs/machines. This may not be ideal if performance is the top priority. However with the data stored in multiple JVM/machines, we can make CQEngine to perform with Tera-bytes or even peta-bytes of data.

My question is, has anyone ever encountered this kind of scenario? If yes, how do you solve the issue.

Niall

unread,
Apr 15, 2014, 10:46:56 AM4/15/14
to cqengine...@googlegroups.com
Hi Winarto,

I guess the options are to shard data in RAM over multiple machines, or to page data to disk on a single machine; or combinations thereof.

I posted some thoughts about what would be required to page data to disk with CQEngine in a previous discussion.

Re: sharding, the nice thing about dealing with sets, is that union/intersection/negation operations all result in a new set. So set theory is quite amenable to partitioning. So lets say you partitioned your data by some attribute like country. You could send the same query to the shards responsible for each country, and union the results lazily in the client. This is hardly "full featured" or transparent sharding of course, but if your queries are simple enough you could take that approach right now. (i.e. separate instance of IndexedCollection on each shard, each unaware of the other shards)

I do want to implement better support for off-heap collections. It is just a question of finding time to do this :)
The sharding aspect is probably an entirely separate effort. I'd support anyone who wants to help in that area, but I'm not sure if I'd have time myself for now :)

HTH,
Niall

John Smith

unread,
Apr 15, 2014, 11:56:48 AM4/15/14
to cqengine...@googlegroups.com
Personally I tried multiple "grid/in-memory" products (open source and commercial) and all but one fell short! They where all very slow with my query. Only the one grid product and CQ engine where fast enough. Unfortunately the project got shelved for now one reason being price. Disclaimer I may not have properly evaluated them all correctly and all tests where based on what I needed to do).

Though I can tell you I did achieve map/reduce style functionality with CQ Engine and the framework http://vertx.io and I was able to "shard" data across multiple server nodes fairly easily. But this didn't involve any kind of data availability/fail over. If one of my nodes went down then I would lose that data.

Note: With the below CQ query I was able to read through 30,000,000 records in under 3ms.

Query<TransactionSearchable> query1 = or(
equal(...),
equal(...),
equal(...),
equal(...),
equal(...),
equal(...),
equal(...),
equal(...),
equal(...));

Winarto Zhao

unread,
Apr 15, 2014, 12:25:35 PM4/15/14
to cqengine...@googlegroups.com
Hi Niall,

Before my current requirement to have distributed/multi-nodes, I only have 1 node and CQEngine performed super fast. However to make it even faster, my ex-colleague enhanced CQEngine implementation by creating partitioned CQEngine where by each data is distributed into multiple IndexedCollection based on various (defined) attributes (perhaps 2 or 3 highly queried attributes) and perform deduplication on the resultsets. That implementation could speedup the query up to 2-3x on very large amount of data.

However that implementation is not horizontally scalable because the underlying data is stored in multiple collections hence multiple indexes are created and requires higher memory footprint.

What I currently need is to support query data of up to few terabytes. If I fallback to disk, I'd rather fallback to DB than to raw file. But that means if the node is down, it is going to be single point of failure. The visible solution is to make CQEngine query on distributed data.

Winarto Zhao

unread,
Apr 15, 2014, 12:32:59 PM4/15/14
to cqengine...@googlegroups.com
Hi John,

I'm also using Vertx.io though for different purpose. Vertx.io itself achieve its seamless communication between nodes by relying on Hazelcast's distributed multimap.

Coming back to your CQEngine shard using Vertx.io, are you firing the same query to all nodes using publish method in Vertx's eventbus and let the node that has the corresponding to respond? If that's the case, as you said, it doesn't achieve any high availability and distributed CQEngine at all.
However if you use different implementation, please share it.

Niall Gallagher

unread,
Apr 15, 2014, 1:39:15 PM4/15/14
to cqengine...@googlegroups.com
By the sounds of that, you were replicating the same data to all nodes?

The alternative is to shard the data, where each node gets a partition of the data instead. (Partitioned by some attribute(s), like "country" or something)

Sharding can be as fast as replication, if you can partition the query in the same way and send the subqueries to each shard in parallel.

For fault tolerance, let's say you have 6 nodes to work with. If you divide your dataset into 3 shards and install each third of the data on two nodes, you could have a fault-tolerant pair of nodes behind a load balancer for each shard. This would allow you to triple the amount of data you can load into CQEngine while retaining query speed and it would remain fault tolerant.

I'm not super-familiar with Hazelcast but you could probably design something even better by running CQEngine on top of Hazelcast (as you are doing) or some other framework. I'm thinking about frameworks which might provide consistent hashing where you can add new shards and "re-balance" automatically etc.




--
-- You received this message because you are subscribed to the "cqengine-discuss" group.
http://groups.google.com/group/cqengine-discuss
---
You received this message because you are subscribed to the Google Groups "cqengine-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cqengine-discu...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Winarto Zhao

unread,
Apr 15, 2014, 1:51:15 PM4/15/14
to cqengine...@googlegroups.com
Hazelcast is not to share the exact same amount of data in multiple nodes (though you can do that as well). Hazelcast is used to distribute (not share) the data. So I can configure that an item is synchronously backed up by at least another one or two other nodes. However single node will not hold the entire data (unless if there is only 1 node left alive).

You mentioned "For fault tolerance, let's say you have 6 nodes to work with. If you divide your dataset into 3 shards and install each third of the data on two nodes, you could have a fault-tolerant pair of nodes behind a load balancer for each shard. This would allow you to triple the amount of data you can load into CQEngine while retaining query speed and it would remain fault tolerant."
That's exactly what I'm currently doing.

The problem that I'm currently having is:
1. I don't know if my implementation is correct the from CQEngine point of view
2. I don't know how to pause the query being performed if during that query period a node is down and rebalancing of data is happening.

John Smith

unread,
Apr 15, 2014, 1:58:54 PM4/15/14
to cqengine...@googlegroups.com
Basically I used vertx as follows...

1- Http module to receive the incoming request (send to step 2 below via event bus)
2- Vertx module to write/read to/from CQ-Engine. Deployed each module to it own address as adress.samemodule.1, adress.samemodule.2, adress.samemodule.3

The http module would calculate modulus to pass a flag to indicate which module would do the write (evenly distributed) for the incoming request and propagate the same read on all nodes and then reduce back in the http module. It easy to do with when.java for vertx.

I think the best implementation is what Winarto did because building your own sharding and replication is the worst idea ever really.

One reason I chose CQ was because it's querying is allot faster then Hazelcast. I think all these grid products are good at point queries but when it came to SQL like queries they fell short (again based on my query needs).



--
-- You received this message because you are subscribed to the "cqengine-discuss" group.
http://groups.google.com/group/cqengine-discuss
---
You received this message because you are subscribed to a topic in the Google Groups "cqengine-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cqengine-discuss/yPH9buz9eUE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cqengine-discu...@googlegroups.com.
Message has been deleted

John Smith

unread,
Apr 15, 2014, 2:23:44 PM4/15/14
to cqengine...@googlegroups.com
Winarto are you able to share what you did?

Here is what I did...

Niall

unread,
Apr 15, 2014, 4:43:38 PM4/15/14
to cqengine...@googlegroups.com
Hi Winarto,

I have (finally) just now documented a mechanism I previously discussed here in the forum, to control TransactionIsolation.

If you need to pause queries while nodes are down or rebalancing is happening, the mechanism only requires a ReadWriteLock.

I don't know if Hazelcast provides a ReadWriteLock, but I know that Zookeeper does support that. You could probably adapt it to work with an optimistic locking method either (this is a hunch, I have not actually done that yet).

Winarto Zhao

unread,
Apr 16, 2014, 6:25:38 AM4/16/14
to cqengine...@googlegroups.com
Hi John,

I'm unable to share the code as it is, but I'll try to remove the confidential stuffs and will update this thread.

Cheers,
Winarto

Winarto Zhao

unread,
Apr 16, 2014, 6:28:52 AM4/16/14
to cqengine...@googlegroups.com
Niall,

Are you saying that I have to create Lock to block the entire CQEngine operation, that's not what I'm looking for.
What I'm looking for is when CQEngine is querying large data and I get notification that a node is down and rebalancing is happening, I want to interrupt the CQEngine such that it will throw a kind of InterruptedException so that it doesn't produce inconsistent resultset due to "moving" underlying data within the Set

Regards,
Winarto

Niall Gallagher

unread,
Apr 16, 2014, 7:36:58 AM4/16/14
to cqengine...@googlegroups.com
Hi Winarto,

I'm not really suggesting a complete solution. I figured that part of your problem is transaction isolation - how to ensure that queries return consistent results if certain portions of the data is unavailable, moving, or being modified.

So I'm just answering how you can control transaction isolation in individual CQEngine instances in general, and you can adapt this for whatever sharding solution you are using (Hazelcast I guess).

In a sharding solution, if you ensure that each shard replica can guarantee that its own results will be consistent, then overall you are guaranteed to have globally consistent results. So you could set up a read-write lock on each node (as discussed in TransactionIsolation).

If you are performing a write or update transaction on a particular replica of a shard, that transaction can acquire the write lock for that shard. Then requests for queries requiring READ_COMMITTED isolation reaching that shard replica, can call readLock.tryLock(). This will return false without blocking if data read would be inconsistent. You could then direct the (sub-)query to the other replica for that shard instead.

So you don't need a global lock but you need to ensure that the write lock on any particular replica is held while that replica is being rebalanced (or while it may be inconsistent in some way).



--

John Smith

unread,
Apr 16, 2014, 1:08:43 PM4/16/14
to cqengine...@googlegroups.com
Winarto just the CQ Hazlecats changes would be nice! :) Maybe we can make into a project.

Winarto Zhao

unread,
Apr 16, 2014, 8:34:39 PM4/16/14
to cqengine...@googlegroups.com
Niall,

The problem is that each node does not have a complete set of data, though from CQEngine perspective it assume so. So even if the query is performed at CQEngine node 1, the underlying data in java.util.Set may reside in node 3, 4, 5 (only distribution library knows, in this case is Hazelcast). So I want to interrupt the query that is being performed if I receive event that a node is down and rebalancing is happening because it may happen that the master data being queried resides in the failing node and by interrupting the query, I can requery after rebalancing is done and the data from backup node will be queried.

Message has been deleted
Message has been deleted

John Smith

unread,
Apr 16, 2014, 10:16:18 PM4/16/14
to cqengine...@googlegroups.com

Doesnt hazelcast have a status api that you check?

On Apr 16, 2014 8:41 PM, "Winarto Zhao" <win...@gmail.com> wrote:
Niall,

The problem is that each node does not have a complete set of data, though from CQEngine perspective it assume so. So even if the query is performed at CQEngine node 1, the underlying data in java.util.Set may reside in node 3, 4, 5 (only distribution library knows, in this case is Hazelcast). So I want to interrupt the query that is being performed if I receive event that a node is down and rebalancing is happening because it may happen that the master data being queried resides in the failing node and by interrupting the query, I can requery after rebalancing is done and the data from backup node will be queried.

--
-- You received this message because you are subscribed to the "cqengine-discuss" group.
http://groups.google.com/group/cqengine-discuss
---
You received this message because you are subscribed to a topic in the Google Groups "cqengine-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cqengine-discuss/yPH9buz9eUE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cqengine-discu...@googlegroups.com.

Winarto Zhao

unread,
Apr 16, 2014, 10:42:15 PM4/16/14
to cqengine...@googlegroups.com
Hazelcast provides a callback to say that a member is down and rebalancing is happening. But if at that point there is a query within CQEngine, the result could be unexpected. So I want to interupt and re-query after rebalancing is done. That interruption method that I can't find in CQEngine.

Winarto Zhao

unread,
Apr 16, 2014, 10:44:06 PM4/16/14
to cqengine...@googlegroups.com
Hi John,

I've created a gist for the distributed CQEngine.


I'm happy if we can create a module for CQEngine that can perform distribution. We can start with Hazelcast but in the future we can abstract it so other grid-computing library could be used.

Cheers,

John Smith

unread,
Apr 17, 2014, 10:07:36 AM4/17/14
to cqengine...@googlegroups.com
Can we take this offline so we don't take over the thread?

Winarto Zhao

unread,
Apr 17, 2014, 11:52:54 AM4/17/14
to cqengine...@googlegroups.com
+1

Just drop me email

Niall

unread,
Apr 17, 2014, 1:27:29 PM4/17/14
to cqengine...@googlegroups.com
One option would be to create a wrapper around CQEngine's ResultSet, and have the wrapper throw exceptions whenever you detect that rebalancing is ongoing. This would interrupt ongoing queries as soon as it is detected that rebalancing has started.
Reply all
Reply to author
Forward
0 new messages