Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

Question about DataConsumer.DataEvent Version and Partitions

33 views
Skip to first unread message

Mark Williams

unread,
Jun 24, 2013, 11:24:38 PM6/24/13
to sensei...@googlegroups.com
I've been looking at how the data event version number is used in Sensei/Zoie but wanted to get clarification from someone.

Is the version number used to represent the last event for the entire sensei cluster or does it just apply to a particular partition/shard? i.e. does this version number represent the total ordering of all events that the Sensei cluster can receive?

Since we have a fully distributed system we do not have any field that represents the total order of each event. Instead we are currently relying on the Kafka offset of the topic that Sensei consumes from. In order to achieve scalability we would like to create more Kafka partitions, however, the problem with Kafka partitions is that the offset is different for each partition.

If the Data event version is relevant for a specific shard then this is not a problem; we can partition our Kafka messages using the same key as we partition the data to the shards. However, if the data event version represents the global version for all nodes and all shards then we are stuck with using a single Kafka partition and ultimately will hit scalability issues (since Kafka partitions can only be served by a single thread from a single Broker).

Volodymyr Zhabiuk

unread,
Jun 25, 2013, 11:31:02 AM6/25/13
to sensei...@googlegroups.com
Hi Mark

The version number is used to represent the last consumed event on the
node level. Sensei Gateway consumes events for the whole node and is
responsible for assigning the version. After that the pair of the
actual data event and version is routed to the appropriate partition
or discarded if it doesn't belong to the partition located on the
current node. Versions don't have to be incremental and Kafka doesn't
ensure the ordering of the incoming messages. But we use that ordering
to determine what is the biggest version persisted in Zoie, so that
if the node is restarted the gateway would be aware of it
In general it makes sense to use the Kafka offset as the version only
if we are using Kafka simple consumer, meaning that we've connected to
a single Kafka broker and are pulling messages manually from some
partitions https://github.com/senseidb/sensei/blob/master/sensei-gateways/src/main/java/com/senseidb/gateway/kafka/SimpleKafkaStreamDataProvider.java.
High level Kafka consumer hides this from the client, and we rely
either on the Kafka autocommit interval or commitCurrentOffsets
method. That's why at
https://github.com/senseidb/sensei/blob/master/sensei-gateways/src/main/java/com/senseidb/gateway/kafka/KafkaStreamDataProvider.java
the version is just the local timestamp. The problem with the second
approach is that Kafka commit is not sync'ed with the Zoie persist
event and we might lose some data if the node crashes.
As far as I know for Kafka 0.7 and earlier message offsets might be
different on different brokers for the same partition/topic. This has
been fixed in Kafka 0.8, so we can implement the new Kafka gateway
that keeps track of offsets per partition and prevent possible data
loss

Hope this answers your question :) Just to reiterate: keeping version
as Kafka offset doesn't give us anything if we use Kafka high level
consumer api

Thanks,
Volodymyr

2013/6/24 Mark Williams <zaido74...@yahoo.com>:
> --
> You received this message because you are subscribed to the Google Groups
> "Sensei" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to sensei-searc...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>

Mark Williams

unread,
Jun 25, 2013, 11:43:51 PM6/25/13
to sensei...@googlegroups.com
Hi Volodymyr,

Thank you for your clear explanation. I never realized that the version number was only used solely for resetting the gateway offset in case of failure.

We are currently using Kafka 0.8 and have made (and contributed back) a modification that allows us to reset the offset for the high level consumer. In addition we have made some improvements so we can commit immediately. It's still possible for data loss to occur on a Sensei crash but on startup the offset would be restored to the last known saved point. So I think for now as long as we use a single Kafka partition we can continue to use the high level consumer and Kafka offset.

If we want to support reading from multiple partitions then using Kafka offsets will not work. I don't think it's enough to keep track of the offsets in the gateway alone because we still don't know whether the events with those offsets were actually flushed to disk. Instead we would need to persist the offset for each partition during the zoie flush, and that would require changes to the internal APIs (The gateway API would need a method to receive the starting offset for each partition). I will think about this some more later and make some proposed enhancements to support this.

Thanks again.

--Mark

Lei Wang

unread,
Jun 26, 2013, 12:12:07 AM6/26/13
to sensei...@googlegroups.com
if you changed the kafka consumer and allowing managing the offset on the consumer side, you can construct your version string with a list of partition=>offset pairs, and restore the offsets for each partition after restart.

Mark Williams

unread,
Jun 26, 2013, 12:31:02 AM6/26/13
to sensei...@googlegroups.com
That's a great idea. I keep forgetting that the version string is just that...an arbitrary data packet to send back and forth between sensei and the gateway consumer to maintain state.

Yonghui Zhao

unread,
Jun 26, 2013, 1:41:50 AM6/26/13
to sensei...@googlegroups.com
I think kafka doesn't know partition info, and you can only get offset of one consumer group for one topic
So if you make sure one consumer group has only one node, you can set offset to kafka.  The offset is minimal version of all partitions version.
I have implement this feature in my branch.  But it doesn't work in general case.


2013/6/26 Lei Wang <won...@gmail.com>
Reply all
Reply to author
Forward
0 new messages