Apache Kafka as journal - retention times/PersistentView and partitions

739 views
Skip to first unread message

Andrzej Dębski

unread,
Aug 26, 2014, 3:15:34 AM8/26/14
to akka...@googlegroups.com
Hello

Lately I have been reading about a possibility of using Apache Kafka as journal/snapshot store for akka-persistence. 

I am aware of the plugin created by Martin Krasser: https://github.com/krasserm/akka-persistence-kafka/ and also I read other topic about Kafka as journal https://groups.google.com/forum/#!searchin/akka-user/kakfka/akka-user/iIHmvC6bVrI/zeZJtW0_6FwJ.

In both sources I linked two ideas were presented:

1. Set log retention to 7 days, take snapshots every 3 days (example values)
2. Set log retention to unlimited.

Here is the first question: in first case wouldn't it mean that persistent views would receive skewed view of the PersistentActor state (only events from 7 days) - is it really viable solution? As far as I know PersistentView can only receive events - it can't receive snapshots from corresponding PersistentActor (which is good in general case).

Second question (more directed to Martin): in the thread I linked you wrote: 

 I don't go into Kafka partitioning details here but it is possible to implement the journal driver in a way that both a single persistent actor's data are partitioned *and* kept in order

 I am very interested in this idea. AFAIK it is not yet implemented in current plugin but I was wondering if you could share high level idea how would you achieve that (one persistent actor, multiple partitions, ordering ensured)?

Martin Krasser

unread,
Aug 26, 2014, 9:28:47 AM8/26/14
to akka...@googlegroups.com
Hi Andrzej,


On 26.08.14 09:15, Andrzej Dębski wrote:
Hello

Lately I have been reading about a possibility of using Apache Kafka as journal/snapshot store for akka-persistence. 

I am aware of the plugin created by Martin Krasser: https://github.com/krasserm/akka-persistence-kafka/ and also I read other topic about Kafka as journal https://groups.google.com/forum/#!searchin/akka-user/kakfka/akka-user/iIHmvC6bVrI/zeZJtW0_6FwJ.

In both sources I linked two ideas were presented:

1. Set log retention to 7 days, take snapshots every 3 days (example values)
2. Set log retention to unlimited.

Here is the first question: in first case wouldn't it mean that persistent views would receive skewed view of the PersistentActor state (only events from 7 days) - is it really viable solution? As far as I know PersistentView can only receive events - it can't receive snapshots from corresponding PersistentActor (which is good in general case).

PersistentViews can create their own snapshots which are isolated from the corresponding PersistentActor's snapshots.



Second question (more directed to Martin): in the thread I linked you wrote: 

 I don't go into Kafka partitioning details here but it is possible to implement the journal driver in a way that both a single persistent actor's data are partitioned *and* kept in order

 I am very interested in this idea. AFAIK it is not yet implemented in current plugin but I was wondering if you could share high level idea how would you achieve that (one persistent actor, multiple partitions, ordering ensured)?

The idea is to

- first write events 1 to n to partition 1
- then write events n+1 to 2n to partition 2
- then write events 2n+1 to 3n to partition 3
- ... and so on

This works because a PersistentActor is the only writer to a partitioned journal topic. During replay, you first replay partition 1, then partition 2 and so on. This should be rather easy to implement in the Kafka journal, just didn't have time so far; pull requests are welcome :) Btw, the Cassandra journal follows the very same strategy for scaling with data volume (by using different partition keys).

Cheers,
Martin

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
Martin Krasser

blog:    http://krasserm.blogspot.com
code:    http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

Andrzej Dębski

unread,
Aug 26, 2014, 10:44:13 AM8/26/14
to akka...@googlegroups.com
My mind must have filtered out the possibility of making snapshots using Views - thanks.

About partitions: I suspected as much. The only thing that I am wondering now is: if it is possible to dynamically create partitions in Kafka? AFAIK the number of partitions is set during topic creation (be it programmatically using API or CLI tools) and there is CLI tool you can use to modify existing topic: https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool. To keep the invariant  " PersistentActor is the only writer to a partitioned journal topic" you would have to create those partitions dynamically (usually you don't know up front how many PersistentActors your system will have) on per-PersistentActor basis.

On the other hand maybe you are assuming that each actor is writing to different topic - but I think this solution is not viable because information about topics is limited by ZK and other factors: http://grokbase.com/t/kafka/users/133v60ng6v/limit-on-number-of-kafka-topic.

Greg Young

unread,
Aug 26, 2014, 1:37:15 PM8/26/14
to akka...@googlegroups.com
This is definitely an issue as many people with event sourced systems have millions of topics.

Martin Krasser

unread,
Aug 26, 2014, 1:44:05 PM8/26/14
to akka...@googlegroups.com

On 26.08.14 16:44, Andrzej Dębski wrote:
My mind must have filtered out the possibility of making snapshots using Views - thanks.

About partitions: I suspected as much. The only thing that I am wondering now is: if it is possible to dynamically create partitions in Kafka? AFAIK the number of partitions is set during topic creation (be it programmatically using API or CLI tools) and there is CLI tool you can use to modify existing topic: https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool. To keep the invariant  " PersistentActor is the only writer to a partitioned journal topic" you would have to create those partitions dynamically (usually you don't know up front how many PersistentActors your system will have) on per-PersistentActor basis.

You're right. If you want to keep all data in Kafka without ever deleting them, you'd need to add partitions dynamically (which is currently possible with APIs that back the CLI). On the other hand, using Kafka this way is the wrong approach IMO. If you really need to keep the full event history, keep old events on HDFS or wherever and only the more recent ones in Kafka (where a full replay must first read from HDFS and then from Kafka) or use a journal plugin that is explicitly designed for long-term event storage.

The main reason why I developed the Kafka plugin was to integrate my Akka applications in unified log processing architectures as descibed in Jay Kreps' excellent article. Also mentioned in this article is a snapshotting strategy that fits typical retention times in Kafka.



On the other hand maybe you are assuming that each actor is writing to different topic

yes, and the Kafka plugin is currently implemented that way.


- but I think this solution is not viable because information about topics is limited by ZK and other factors: http://grokbase.com/t/kafka/users/133v60ng6v/limit-on-number-of-kafka-topic.

A more in-depth discussion about these limitations is given at http://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka with a detailed comment from Jay. I'd say that if you designed your application to run more than a few hundred persistent actors, then the Kafka plugin is the probably wrong choice. I tend to design my applications to have only a small number of persistent actors (which is in contrast to many other discussions on akka-user) which makes the Kafka plugin a good candidate.

To recap, the Kafka plugin is a reasonable choice if

- frequent snapshotting is done by persistent actors (every day or so)
- you don't have more than a few hundred persistent actors and
- your application is a component of a unified log processing architecture (backed by Kafka)

The most interesting next Kafka plugin feature for me to develop is an HDFS integration for long-term event storage (and full event history replay). WDYT?

Greg Young

unread,
Aug 26, 2014, 1:56:33 PM8/26/14
to akka...@googlegroups.com
I'm curious how you would model say bank accounts with only a few hundred actors can you go into a bit of detail
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/Bz9pWyK7V7g/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


--
Studying for the Turing test

Martin Krasser

unread,
Aug 26, 2014, 2:04:04 PM8/26/14
to akka...@googlegroups.com

On 26.08.14 19:56, Greg Young wrote:
I'm curious how you would model say bank accounts with only a few hundred actors can you go into a bit of detail

persistent-actor : bank-account = 1:n (instead of 1:1)

Greg Young

unread,
Aug 26, 2014, 2:06:09 PM8/26/14
to akka...@googlegroups.com
Love to see an example

Martin Krasser

unread,
Aug 26, 2014, 2:08:29 PM8/26/14
to akka...@googlegroups.com
See my eventsourced example(s), that I published 1-2 years ago, others are closed source

Greg Young

unread,
Aug 26, 2014, 2:12:58 PM8/26/14
to akka...@googlegroups.com
In particular I am interested in the associated state thats needed, I can see keeping it in a single actor but this does not turn out well at all for most production systems in particular as changes happen over time.

Martin Krasser

unread,
Aug 26, 2014, 2:15:27 PM8/26/14
to akka...@googlegroups.com

On 26.08.14 20:12, Greg Young wrote:
In particular I am interested in the associated state thats needed, I can see keeping it in a single actor but this does not turn out well at all for most production systems in particular as changes happen over time.

I don't get your point. Please elaborate.

Andrzej Dębski

unread,
Aug 26, 2014, 2:24:56 PM8/26/14
to akka...@googlegroups.com

You're right. If you want to keep all data in Kafka without ever deleting them, you'd need to add partitions dynamically (which is currently possible with APIs that back the CLI). On the other hand, using Kafka this way is the wrong approach IMO. If you really need to keep the full event history, keep old events on HDFS or wherever and only the more recent ones in Kafka (where a full replay must first read from HDFS and then from Kafka) or use a journal plugin that is explicitly designed for long-term event storage. 

That was worrying me all the time with using Kafka in a situation where I would want to keep the events all the time (or at least unknown amount of time). The thing that seemed nice is that I would have journal/event store and pub-sub solution implemented in one technology - basically I want to go around current limitation of PersistentView. I wanted to use Kafka topic and replay all events from the topic to dynamically added read models in my cluster. Maybe in this situation I should stick to distributed publish-subscribe in cluster for current event-sending and Cassandra as journal/snapshot store. I did not read that much about Cassandra and the way it stores data so I do not know if reading all events would be easy.

The main reason why I developed the Kafka plugin was to integrate my Akka applications in unified log processing architectures as descibed in Jay Kreps' excellent article. Also mentioned in this article is a snapshotting strategy that fits typical retention times in Kafka.

Thanks for the link. 

The most interesting next Kafka plugin feature for me to develop is an HDFS integration for long-term event storage (and full event history replay). WDYT?

That would be interesting feature - certainly would make akka + Kafka combination viable for more use cases.

Greg Young

unread,
Aug 26, 2014, 2:34:44 PM8/26/14
to akka...@googlegroups.com
OK for bank accounts there is some amount of state needed to verify a transaction. Let's propose that for now its the branch you opened your account at, your current balance,your address and a risk classification as well as a customer profitability/loyalty score (these are all reasonable things to track in terms of deciding if a transaction should be accepted or not)

I could keep millions of these inside of a single actor. 

A few problems come up though: 

Replaying this actor from events is very painful (millions possibly hundreds of millions of events and they must be processes serially) solution->snapshots?
Snapshots have all the same versioning issues people are used to with keeping state around. What happens when the state I am keeping changes say now I also need to keep avg+stddev of transaction amount or we found a bug in how we were maintaining the loyalty score (back to #1) this will invalidate my snapshot (requiring a full replay or else you run into another whole series of hokey problems trying to do "from here forward" type things (imagine a new feature that relies on a 6 month moving average)




Martin Krasser

unread,
Aug 27, 2014, 12:31:15 AM8/27/14
to akka...@googlegroups.com
Whether to go for a 1:1 approach or a 1:n approach (or a partitioned m:n approach where m << n) really depends on the concrete use case and non-functional requirements. Your example might be a good candidate for a 1:1 approach (see also further comments inline) but there are also examples for which a 1:n or m:n approach is a better choice. Here are some general influencing factors:

- length of event history required to recover state: bank accounts need the full event history to be recovered but order management is an example where this is often not the case. Orders (trade orders in finance, lab orders during medical treatments, ...) usually have a limited validity so that you can recover active orders from a limited event history (last 10 days, for example) which should make migrations after code changes rather painless. BTW, having only a single persistent actor (or a few) that maintains state is comparable to role of a "Business Logic Processor" in the LMAX architecture which originated from the high frequency trading domain.

- latency requirements: creating a new persistent actor has some overhead, not only memory but also bootstrap as its creation requires a roundtrip to the backend store. Re-activation of passivated actors that have been designed around a 1:1 approach, may also be in conflict with low latency requirements. Good compromises can often be found by following an m:n approach in this case.

- write throughput: high write throughput can only be achieved by batching writes and batching is currently implemented on a per persistent actor basis. Throughput therefore scales better when having a small(er) number of actors. A large number of actors will create more but smaller batches, reducing throughput. This is however more a limitation of the current implementation of akka-persistence. Maybe a switch to batching on journal level is a good idea, so that a single write batch can contain events from several actors.

- ...

Even if you need to replay a long event history (for example after a code change), you can always do that in the background on a separate node until the new version of the persistent actor caught up and switch the application to it when done. You could even have both versions running at the same time for A/B testing for example. With a replay rate of 100k/sec you can replay a billion events within a few hours.

Further comments inline ...


On 26.08.14 20:34, Greg Young wrote:
OK for bank accounts there is some amount of state needed to verify a transaction. Let's propose that for now its the branch you opened your account at, your current balance,your address and a risk classification as well as a customer profitability/loyalty score (these are all reasonable things to track in terms of deciding if a transaction should be accepted or not)

When validating commands, you only need to keep that part of application state within persistent actors for which you have strict consistency requirements. In context of bank accounts, this is for sure the case for the balance, but not necessarily for customer profitability, loyality score or whatever. These metrics may be calculated in the background, hence, having eventual read consistency for them should be sufficient. Consequently this state can be maintained elsewhere (as part of a separate read model) and requested from persistent actors during transaction validation. If you need further metrics in the future, new read models can be added and included into the validation workflow initiated by a persistent actor.


I could keep millions of these inside of a single actor. 

A few problems come up though: 

Replaying this actor from events is very painful (millions possibly hundreds of millions of events and they must be processes serially) solution->snapshots?
Snapshots have all the same versioning issues people are used to with keeping state around. What happens when the state I am keeping changes say now I also need to keep avg+stddev of transaction amount or we found a bug in how we were maintaining the loyalty score (back to #1) this will invalidate my snapshot

See above, there's no need to keep all of that inside the persistent actor for strict read consistency. Allowing eventual consistency during command validation where possible not only makes the validation process more flexible (by just including new read models if required) but also reduces snapshot migration efforts (by simplifying the state structure inside persistent actors).

Furthermore, ensuring strict consistency for persistent actor state requires usage of persist() instead of persistAsync() which reduces throughput at least by a factor of 10. That may again be in conflict with write throughput requirements.

To conclude, I think there are use cases where a 1:1 approach makes sense but this shouldn't be a general recommendation IMO. It really depends on the specific functional and non-functional requirements for finding the best compromise.

Martin Krasser

unread,
Aug 27, 2014, 12:53:45 AM8/27/14
to akka...@googlegroups.com

On 26.08.14 20:24, Andrzej Dębski wrote:

You're right. If you want to keep all data in Kafka without ever deleting them, you'd need to add partitions dynamically (which is currently possible with APIs that back the CLI). On the other hand, using Kafka this way is the wrong approach IMO. If you really need to keep the full event history, keep old events on HDFS or wherever and only the more recent ones in Kafka (where a full replay must first read from HDFS and then from Kafka) or use a journal plugin that is explicitly designed for long-term event storage. 

That was worrying me all the time with using Kafka in a situation where I would want to keep the events all the time (or at least unknown amount of time). The thing that seemed nice is that I would have journal/event store and pub-sub solution implemented in one technology - basically I want to go around current limitation of PersistentView. I wanted to use Kafka topic and replay all events from the topic to dynamically added read models in my cluster. Maybe in this situation I should stick to distributed publish-subscribe in cluster for current event-sending and Cassandra as journal/snapshot store. I did not read that much about Cassandra and the way it stores data so I do not know if reading all events would be easy.

That's a single table in Cassandra (some details about ordering here). One could derive further tables with a user-defined ordering/filtering/... from which multiple readers/subscriber could consume and derive read models. These derived tables are comparable to user-defined topics in the Kafka journal. Whether they are populated by the plugin during write transactions or later, by running separate transformation processes, is an implementation detail. The Kafka journal does the former, the latter gives more flexibility regarding new read model requirements (as no upfront knowledge is required what to write to user-defined tables/topics).

Greg Young

unread,
Aug 27, 2014, 9:28:31 AM8/27/14
to akka...@googlegroups.com
I have used 1:n it's a fairly common pattern I just wanted to point out it's not a panacea that works everywhere and had some rather large downsides if applied in the wrong place. I agree in the discussion being around when which is applicable. My original point was that you can't really do 1:1 with many of the backends as they don't support millions of streams.

Btw for performance what many do is an identity map and caching in memory (assuming the whole set does not fit in memory)

There is a side bit to this as well in terms of probability. If you use one actor for n it's next to impossible to show that transactions do not interfere with each other (while this is rather trivial with 1:1 as they would need messages between each other)

Cheers,

Greg

Ashley Aitken

unread,
Aug 27, 2014, 10:25:58 AM8/27/14
to akka...@googlegroups.com

Martin,

Thank you very much for your most interesting and useful post.  

I think many of us benefit considerably from you sharing your practical experience with Akka Persistence and CQRS.

It's probably too early for "best practices" but knowing what may work well (or not) in real-world applications is invaluable.

Of course, thanks to the others in the Akka team and Greg as well.

Cheers,
Ashley.


On Wednesday, 27 August 2014 12:31:15 UTC+8, Martin Krasser wrote:
Whether to go for a 1:1 approach or a 1:n approach (or a partitioned m:n approach where m << n) really depends on the concrete use case and non-functional requirements. Your example might be a good candidate for a 1:1 approach (see also further comments inline) but there are also examples for which a 1:n or m:n approach is a better choice. Here are some general influencing factors:
...

Greg Young

unread,
Aug 27, 2014, 11:19:18 AM8/27/14
to akka...@googlegroups.com
Provability not probability (iPad helping)

Martin Krasser

unread,
Aug 27, 2014, 1:52:51 PM8/27/14
to akka...@googlegroups.com
Thanks for your kind words, Ashley. Glad you find it useful.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages