Cassandra as an MQTT retained message store

554 views
Skip to first unread message

Laing, Michael

unread,
Dec 11, 2014, 5:13:02 PM12/11/14
to rabbitm...@googlegroups.com
Following up on a previous thread re enhancing RabbitMQ MQTT, I wanted to let people know that I have been working on the above.

The current nytfabrik persists messages in Cassandra by topic. This is very handy, as we also route by topic :) We have many millions of messages at any one point in time.

However, unlike the ease of wildcard routing in RabbitMQ, wildcard queries are difficult in Cassandra. Effectively, we allow only the last level to be used as a '*' or '#' wildcard for the purpose of query. Cassandra has a very limited query syntax.

MQTT has a similar topic structure; the standard defines how topic queries should work. Basically the '+' wildcard (equivalent to the AMQP '*') can appear in any level. And a conforming implementation must support query of "retained" messages.

Since we are considering adding MQTT to the nytfabrik, we wanted to solve this query problem, as it comprised the primary technical obstacle to MQTT use in our environment.

We now have a topic store and query proof-of-concept for Cassandra that solves that problem. 

Currently it handles up to 10 levels of wildcards or explicit values - it is O(1) up to 3 or 4 levels increasing to ~O(5) at 10 levels. Internally, Cassandra starts responding in ~100 microseconds, and a small cluster can sustain thousands of queries per second.

At any rate, we are eager now to find a way to integrate this high-capacity retained message store into RabbitMQ MQTT, as well as other features such as ACLs.

Michael L

Michael Klishin

unread,
Dec 11, 2014, 5:23:59 PM12/11/14
to Laing, Michael, rabbitm...@googlegroups.com
Michael,

This sounds very interesting. Multiple MQTT plugin improvements are on the table for next year.

Can you share a bit more details about how your topic lookup algorithm works on C* and what specifically
you think RabbitMQ plugin can learn from it?

Thank you.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Laing, Michael

unread,
Dec 11, 2014, 9:42:50 PM12/11/14
to Michael Klishin, rabbitm...@googlegroups.com
I think the more promising approach may be to provide a somewhat standardized way in which persistence mechanisms can be connected for the purpose of providing the standard "retained" capability for MQTT, since not everyone will want to use Cassandra and, in fact, this sort of thing is lots easier to do with Postgres or maybe mongo and other nosql database systems.

In the case of "big table" databases like Cassandra, my solution is, in retrospect, pretty simple: come up with strategies, which can be combined, which result in a single query that retrieves all results. The query must specify partition key (the row), a row slice, and possibly filtering.

With Cassandra you first look at inversions. I came up with a way of identifying "useful" inversions and associating them with the number of levels in the data being stored: more levels means more inversions. This resulted in an inversion strategy which is significantly better than all "valid" combinations and which yields O(1). Beyond 4 levels it gets onerous, however. At 4 levels the "push" cost exceeds the "pull" cost by less than a magnitude, beyond that it quickly increases.

The inversion strategy algorithm I employ is significantly better than a naive permutation strategy. For "push" the strategy yields a table which tells the driver which inversions to perform for a particular input topic pattern. The table is invariant for the "span" of the data: how many contiguous wildcard levels are allowed for the dataset.

For "pull", the inversion strategy abstracts a pattern which in turn determines an inversion and a mapping of levels to keys in the inversion. The topic pattern also determines a Cassandra query pattern. The topic query, the inversion, the mapping, and the query pattern combine to make an actual query to Cassandra. The Cassandra query pattern is cached as a "prepared" query in my driver. The topic query pattern is also cached, so similar subsequent queries are very fast, requiring only mapping and no analysis or preparation.

The inversion strategy always executes in O(1) regardless of query complexity because it fully determines a partition key and a contiguous set of cluster keys (a row slice).

The alternate index strategy algorithm employs Cassandra's alternate indices to first refine the slice and, if the query is more complex, to apply filtering to the slice. Alternate indices are sparse and only used as needed. 5 or 6 levels is a reasonable max for this strategy.

But the strategies can be combined to yield more levels. I have come up with more possible strategies as well, but actually 10 is enough for our uses.

However there is the problem of achieving a bigger range in the partition key. I do this by using a 'fixed' strategy: strategy "0". Basically the first topic level gives you metadata which tells you how many of the leading levels are "fixed", i.e. cannot have wildcards. These can be "promoted" to the partition key, providing more complexity.

We may also employ a strategy "N": designate levels which may have wildcards, but usually don't, and use those to compute a shard which becomes part of the partition key - most queries will go to 1 partition in that case but those with wildcards will use "IN" logic and go to multiple shards (still a single query but more work).

Anyway, I plan to give a presentation in NYC about this in Feb - perhaps I'll put it up as a blog post as well. And maybe the python algorithms will be useful. I am trying to document them so I can remember how and why they work :)

Regarding RabbitMQ MQTT - I am thinking erlang is in my future so we would not be shy about diving in to make this work. It would be good to have "retained" hooks to hit with either Cassandra or something else.

Actually, I am creating a POC asynch full duplex non-blocking interface to mosquitto for the Cassandra driver (and also for the Postgres driver I created to benchmark and keep myself honest.) I am using zmq for this, hooked into the main event loop of mosquitto, and, of course, that might be a way hook into erlang/OTP as well. My C is rusty so I am a little slow on this one (I am old enough to have been programming for years before C was created...).

ml
Reply all
Reply to author
Forward
0 new messages