Integrating KairosDB + Apache Spark?

762 views
Skip to first unread message

Fernando Paladini

unread,
Sep 23, 2015, 7:46:54 AM9/23/15
to KairosDB
Hello guys, how're you?

I'm very new to big data and machine learning world, so sorry if I made some misconception. Recently I've dumped a SQL database into KairosDB (yes, the timeseries was stored inside a SQL database) and now I would like to integrate KairosDB with Apache Spark (later I'll use the Spark machine learning library over this data).

I've searched in this forum and in Apache Spark mailist too, but I didn't find anything useful about that matter - no clear instructions until the moment.

Currently I know that I've two ways to integrate Spark and Kairos:
  1. Using KairosDB API (is this fast? I saw that is hard to read JSON from Spark, any thoughts about that?)
  2. Integrating Spark and Cassandra directly (how can I read KairosDB data from Cassandra?)

But what are cons and pros? Which one is easy? Which one is fast? There's a clear way on how to implement any of these choices?

How can I integrate KairosDB and Apache Spark?


Before ask I've read the following topics:

Brian Hawkins

unread,
Sep 23, 2015, 9:32:00 PM9/23/15
to KairosDB
Here are some thoughts.  It would be a whole lot easier I imagine if Kairos used CQL but, it doesn't.  The reasons for moving to CQL are ever increasing and I'll probably start working on it before to long.

Working with kairos data as it is.  To do this you will have to use code that is in CassandraDatastore to read from Cassandra.  I'm not really familiar with Spark (but I soon will be) so here is my best guess at how to go about running this.  Spark takes a bunch of little data sets and then does interesting things with them.  Kairos lays data out into a bunch of different rows in C*.  If you could map each row to a data set for Spark you would be on your way.

When Kairos does a query we first do a lookup in the index and figure out what rows we need to fetch from C*, then we fetch the rows and combine/aggregate them.  I would think that spark would do the same, query the index to figure out what rows to get and then farm off jobs to deal with each row and process it.

After writing that all out it may be easier to just do a CQL datastore.

Brian

Kevin Burton

unread,
Sep 24, 2015, 12:44:52 AM9/24/15
to KairosDB
Additionally, you're going to want to read the data contiguously off disk so you aren't getting many VFS page cache misses.  Even if the data is all in memory it will be more efficient to read it contiguously... 

You could use the kairosdb-client you aren't guaranteed sequential access that way ...

Fernando Paladini

unread,
Sep 24, 2015, 2:19:58 PM9/24/15
to KairosDB
Thank you for the reply, really!

And sorry, but what's "C*"? 

Anyway, thank you. Very valuable your comment :)

Fernando Paladini

unread,
Sep 24, 2015, 2:21:38 PM9/24/15
to KairosDB
So, should not I use kairosdb-client? What this really means: that I should not use the API or that I should not access data directly from Cassandra?

Brian Hawkins

unread,
Sep 24, 2015, 6:53:35 PM9/24/15
to KairosDB
C* = Cassandra, shorthand created in the Cassandra community.

I wouldn't use the client, I would hack code in CassandraDatastore to access the bits of data out of Cassandra.

Brian

Fernando Paladini

unread,
Sep 25, 2015, 2:38:23 PM9/25/15
to KairosDB

Thank you for the explanation. Just one more thing, I'm reading Cassandra Schema on KairosDB documentation and I'm trying to understand what KairosDB does when inserting things on Cassandra (that's a kind of obvious, I need to understand the database before access its data).

However, when I query KairosDB schema with:


select * from kairosdb.data_points limit 10;


I got:



And when I query:

select * from kairosdb.string_index  limit 10;

I got:


Why is the data stored as Blob? How can I read this data? The data points generate by my sensors are VERY small, always being something between 0 an 1000. Why store it as Blob instead of integer? That's my last great question, hope you can help me.



Em quinta-feira, 24 de setembro de 2015 19:53:35 UTC-3, Brian Hawkins escreveu:

Erol Merdanović

unread,
Sep 26, 2015, 3:30:21 PM9/26/15
to KairosDB
Brian will explain it better, but short answer is: it saves data as blobs to save space. I think there is a discussion (search for it), plus short explanation in docs - http://kairosdb.github.io/website/docs/build/html/CassandraSchema.html.


So now the idea is to talk directly to C*?

Loic Coulet

unread,
Sep 26, 2015, 4:58:43 PM9/26/15
to KairosDB
Actually in Cassandra every data are byte arrays, and using blobs is the thrift way (KairosDB uses thrift protocol).
By introducing and moving on with CQL Cassandra introduced typed data schemas but it's probably less relevant for the use case.

Using blobs save space (by using variable length encoding) and last but not least, thanks to this approach KairosDB storesdifferent data types: integers, floating points, strings, complex, and any custom data type that you could imagine !


Having a rigid CQL schema would reduce performances (Cassandra is already the bottleneck, and Cassandra would have to deserialize data), reduce storage efficiency, and lower down the flexibility of KairosDB by orders of magnitudes.

Fernando Paladini

unread,
Sep 26, 2015, 6:00:42 PM9/26/15
to KairosDB
Thank you for the amazing explanations, it was very clear to understand!

Now I finally understand the importance of using blobs in KairosDB. Nice architecture :) I will take a look in the discussion that you've linked, seems nice for who are looking into Java integrations (the problem is that I'm not using Java, so I think I can't call it's class).

Said that, I'm still in doubt on how can I "decode" these blobs directly from Cassandra or from a Python script. I would like to query data directly from Cassandra, not to call Java methods from KairosDB code (this way I can integrate easier with Spark).

I was trying to use BlobAsbigint() and BlobAsint() [not sure if the last one exists] function from Cassandra, but I have no success. After discovering how to integrate KairosDB with Spark through querying Cassandra directly, I'll write a serie of articles about how to do that (hope that in english and in portuguese too).

Thanks for the awesome support until now!

Brian Hawkins

unread,
Oct 1, 2015, 4:18:08 PM10/1/15
to KairosDB
To help you understand (in code) how things are encoded/decoded have a look at:
This has the code to encode the row keys (partition key in CQL).

Packing and unpacking of long values is done here:

The column name is a 4 byte integer offset from the row time.  The computation of this value is done here:

Sorry it is not very friendly but, that is that way of thrift in C*.

Brian

Fernando Paladini

unread,
Oct 3, 2015, 5:15:55 PM10/3/15
to KairosDB
Thank you! By now I'm using the KairosDB API to query data from Cassandra and store them as DataFrames on Spark.

I'll make some progress in the project and if everything goes well, I'll publish some articles or tips on how to integrate KairosDB and Spark.
Thank you for all the help! :)

Erol Merdanović

unread,
Oct 4, 2015, 2:28:44 PM10/4/15
to KairosDB
Please do. I would love to try it out!
Reply all
Reply to author
Forward
0 new messages