Improving Akka Persistence wrt CQRS/ES/DDD

3,051 views
Skip to first unread message

ahjohannessen

unread,
Jul 25, 2014, 11:05:40 AM7/25/14
to akka...@googlegroups.com
I think that Akka Persistence lacks in some areas that are essential for using CQRS
with DDD aggregates effectively (without a lot of pain):

 1) Ability to read persistent actor messages by type of Actor. This can be as simple
    as a key prefix that represents a "category/tag/topic",
    e.g. <topic>-<persistenceId>. Essentially an embedded secondary index.

 2) As there is no global sequence number for events across all messages in the same
    way there is for messages of a single persistent actor; there should be
    a way to track the logical positions of messages. GetEventStore uses a similar
    approach.

 1: Gives us ability to read across all events from a persistent actor family, e.g. customers.

 2: Gives us ability, by way of 1, to do catch-up-subscription of all events from persistent
    actors of same type, say customers, and keep track of position. This is very useful for
    maintaining a read model and integration with other systems.

I have tried many approaches and to mention a few:

 a) One persistent actor for a "topic".
 b) Join streams using AtLeastOnceDelivery in individual persistent actors to
    a "topic" aggregator.
 c) Using a persistent actor as a journal for a "topic" and replaying that
    into views, filtering events produced by "topic" siblings.

All of the above introduce unnecessary complications and work-around gymnastics such as
dealing with state size and archival (a), worrying about out-of-order (b) and using
background workers that move views by way of snapshotting forward (c). All of this
because AP does not provide a way to achieve 1) and 2).

From my point of view it is evident that AP lacks some fundamental and essential 
capabilities in order to get a decent CQRS/DDD/ES setup.

Konrad 'ktoso' Malawski

unread,
Jul 25, 2014, 12:20:02 PM7/25/14
to akka...@googlegroups.com, ahjohannessen
Hello there,
Ah, much better when able to communicate in full sentences without 140 chars limit! ;-)

So, now that it’s spelled out as full sentences, I’ll gladly dig into your points:

1) 
Has been already proposed and accepted in https://github.com/akka/akka/issues/15004,
including your +1, so I guess you’re aware that it’s in our backlog.

The module is experimental and published “early” exactly in order to gather,
and implement these features before stabilising the APIs.

So it’s coming, we simply have not yet gotten to implementing it - it’s holiday season, which isn’t helping development speed :-)

2) 
For the benefit of the discussion, example in EventStore: http://geteventstore.com/blog/20130707/catch-up-subscriptions-with-the-event-store/

One thing to keep in mind here is that some Journals would have no problem implementing this, such as Kafka or EventStore - because it’s a built in mechanism to “subscribe to something” in them… See Martin’s Kafka journal and how one can subscribe to a event stream there: https://github.com/krasserm/akka-persistence-kafka#journal-topics On the other hand implementing this in other Journals would be pretty painful / inefficient (cassandra, hbase, …).

We were playing around with some ideas to expose optional db specific journal functionalities. This would be a good candidate for this.

This request seems to depend on these things by the way: 
* changes in the journal plugins (we some changes there anyway https://github.com/krasserm/akka-persistence-kafka#journal-topics ),
* views over “tags" (as this would essentially be a view over “all”),
* and lastly reactive-streams (views exposed as streams of events).


Thanks for your feedback and keep in mind that no-one said that this module is “done”.
It’s still experimental and this kind of feature requests are exacly what we need and will have to provide to make it stable and usable in all kinds of apps.

Lastly, would you mind creating a ticket for the 2) feature?
Thanks in advance, have a nice weekend :-)

-- 

Konrad 'ktoso' Malawski
hAkker @ typesafe

Martin Krasser

unread,
Jul 26, 2014, 4:42:53 AM7/26/14
to akka...@googlegroups.com

On 25.07.14 18:19, Konrad 'ktoso' Malawski wrote:
Hello there,
Ah, much better when able to communicate in full sentences without 140 chars limit! ;-)

So, now that it’s spelled out as full sentences, I’ll gladly dig into your points:

1) 
Has been already proposed and accepted in https://github.com/akka/akka/issues/15004,
including your +1, so I guess you’re aware that it’s in our backlog.

The module is experimental and published “early” exactly in order to gather,
and implement these features before stabilising the APIs.

So it’s coming, we simply have not yet gotten to implementing it - it’s holiday season, which isn’t helping development speed :-)

2) 
For the benefit of the discussion, example in EventStore: http://geteventstore.com/blog/20130707/catch-up-subscriptions-with-the-event-store/

One thing to keep in mind here is that some Journals would have no problem implementing this, such as Kafka or EventStore - because it’s a built in mechanism to “subscribe to something” in them… See Martin’s Kafka journal and how one can subscribe to a event stream there: https://github.com/krasserm/akka-persistence-kafka#journal-topics On the other hand implementing this in other Journals would be pretty painful / inefficient (cassandra, hbase, …).

A general approach for supporting this in other journals would require:

- a materialized view (on journal entries) in the backend store (compare to user-defined topics in the Kafka journal). Updating this view (or several) can either be done by the plugin itself or by backend store triggers, for example. In the case of the Kafka journal, the plugin updates the materialized view (= user-defined topic). Alternatively, user-defined views don't necessarily need to be materialized, if a backend store can generate them on-the-fly in a very efficient way.

- a consumer that keeps track of what has already been consumed from the materialized view. Kafka consumers, for example, do this automatically (the consumer position can optionally be written to Zookeeper for fault-tolerance). PersistentViews (in Akka Persistence) also track the consumer position on the client side. Storing that position is only possible via snapshotting at the moment. Extending PersistentViews to track the position in a more efficient way (instead of snapshotting) could make sense.

IMO, materialized views together with pull-based consumers that can write checkpoints shouldn't be a big deal for any backend store to support. What Akka Persistence should offer in the future is an abstraction over how custom (materialized) views can be defined and over pull-based consumers. Furthermore, an a.p.PersistentView could then abstract over processor journals and custom views (which would finally cover issue #15004).

Thoughts?


We were playing around with some ideas to expose optional db specific journal functionalities. This would be a good candidate for this.

This request seems to depend on these things by the way: 
* changes in the journal plugins (we some changes there anyway https://github.com/krasserm/akka-persistence-kafka#journal-topics ),
* views over “tags" (as this would essentially be a view over “all”),
* and lastly reactive-streams (views exposed as streams of events).


Thanks for your feedback and keep in mind that no-one said that this module is “done”.
It’s still experimental and this kind of feature requests are exacly what we need and will have to provide to make it stable and usable in all kinds of apps.

Lastly, would you mind creating a ticket for the 2) feature?
Thanks in advance, have a nice weekend :-)

-- 

Konrad 'ktoso' Malawski
hAkker @ typesafe
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
Martin Krasser

blog:    http://krasserm.blogspot.com
code:    http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

Greg Young

unread,
Jul 26, 2014, 12:03:13 PM7/26/14
to akka...@googlegroups.com, ahjoha...@gmail.com
I think whats being missed here is that Event Store doesn't only support this on a single stream/topic.

Its very useful when building read models as example to be able to read events from many streams (or all streams) joined in a way that gives global ordering (if on a single replication group) or deterministic ordering (historical) if on multiple replication groups.

As example you can imagine when writing you have 500,000 streams (one per inventory item). 
When building a projection you are interested in 5 event types:

InventoryItemCreated
InventoryItemAuditted
InventoryItemRenamed
InventoryCheckedIn
InvnetoryCheckedOut

Regardless of the initial stream they were written to. This is very common in these types of systems.

Cheers,

Greg

Ashley Aitken

unread,
Jul 27, 2014, 12:28:55 PM7/27/14
to akka...@googlegroups.com

Thank you @ahjohannessen for your post, I have had this feeling as well but as I am a novice here I've been trying to learn by asking questions (rather ineptly I might add) as compared to your intelligent post.  

I would like, however, to express some further (probably confused) thoughts. I understand these APIs are experimental and I am not criticising the amazing work of the Akka team, just trying to help and to learn more.  

Some rambling thoughts:

A. I don't know how Akka currently stores its messages for each (Persistent) Actor but it seems to me there may be a possibility to store these messages and persistent events in a combined message/event store (like Kafka, either temporarily or permanently). 

B. There needs to be some way for events (or messages) to activate a passivated Persistent View (or Persistent Actor) with these events (or messages) being filtered from one or many Persistent (or regular) Actors (e.g. a set or type). 

I know the second part of B. has been discussed and an issue raised for it but here I am suggesting it may be broadened to include messages, enabling broadcast messages of a sort - it seems a bit odd to me that Actor messaging is only one-to-one.  

C. There needs to be some way for other applications to easily interface with this message / event store even if they are not Akka or Scala or even JVM applications.  As suggested elsewhere, this is something the store itself can perhaps enable via APIs.

I think that article about logs by the LinkedIn architect strongly supports the fact that logs needs to be at the centre of enterprise applications / systems.  Logs can store persistent events (not just for persistence of Actors) and store message (i.e. data) streams as well.

As Greg's post seems to be alluding, I don't see how we can effectively setup Akka Streams for all the possible processing needed for CQRS Views. Persistent Views need be subscription-based and need a powerful ability to filter what they receive and from which Persistent Actor(s).

In summary, I imagine a (distributed) message store that is used for sending messages to one or more Actors using prescribe or subscribe where some of these messages may be persistent events (used to restore a passivated Persistent Actor or update a Persistent View).

[ I suggest that prescribe is where the actor indicates which actor or actors the messages are to be sent to whereas subscribe is like in traditional pub-sub where other actors indicate they wish to receive messages from one or more actors].

The producer of the messages / events can decide how long the messages / events are kept for.  So for example, for persistence events they can be kept forever, but for regular messaging they may be kept for a shorter period.  

Again, please remember I know very little about what I am trying to talk sensibly about so I apologise if none of this makes sense :-)

Cheers,
Ashley.


Suggested Generalised Actor / Messaging Model (for a laugh)

All messages (some of which main contain events) are persisted in a store (that may be distributed) for a duration that is customisable.  There is some sort of partial ordering of these messages.  

An actor may prescribe to which other actor(s) they are sending a message (or messages).  The destination actor(s) will process the messages in their virtual mailbox one after the other (and be activated if a message arrives whilst they are passivated).

An actor may subscribe to receive all or a subset of the messages from one or more other actors.  The receiving actor will process the messages one after the other (and be activated if messages arrive whilst they are passivated).

Some mechanism may exist to allow the pace of the production of messages by an actor(s) to be controlled by the consumer(s) of those messages.  Actors may replay any set of messages that it has been sent or subscribed to (as far back as the messages are stored).



ahjohannessen

unread,
Jul 28, 2014, 10:08:12 AM7/28/14
to akka...@googlegroups.com, ahjoha...@gmail.com

Lastly, would you mind creating a ticket for the 2) feature?
Thanks in advance, have a nice weekend :-)

Sure Konrad, however I think I'll wait a little bit because getting input from the likes of Greg and Martin helps formulate the issue more clearly. 
It would be awesome to get the opinion from people like Vaughn, Roland, Patrik, Viktor and so on, as well, please share your thoughts guys :).
I believe this "missing link" is very important for Akka Persistence being *generally* useful for CQRS/DDD/ES.

Martin, your suggestion makes very much sense in my book, clever and creative as usual.

ahjohannessen

unread,
Jul 28, 2014, 10:31:30 AM7/28/14
to akka...@googlegroups.com, ahjoha...@gmail.com
Greg, exactly. I think Akka Persistence having such capabilities would make it even more awesome and useful. 
Thanks for chiming in, your opinion on this is very much appreciated.

Ashley, thanks for the kind words. Glad to learn that my worries are not completely mental and just my own nitpicking :)

Konrad 'ktoso' Malawski

unread,
Jul 28, 2014, 4:49:00 PM7/28/14
to akka...@googlegroups.com, ahjohannessen, ahjoha...@gmail.com
Hi everyone,
thanks for your feedback and ideas.

So the stream / view on multiple persistentIds (or “tags” - would solve Greg’s example case) is coming, we just have not yet have had the time to work on it.
One thing that ties in into them is reactive streams. We would like to expose these event streams as akka streams.
Esp. since they provide they provide things like merge / filter / tee which I believe would help a lot in these kinds of event streams :-)

From the streams point of view abstracting if it’s polling or DB-side initiated events the APIs won’t have to change.
I do agree / like Martin’s suggestion that in “normal dbs” (no events when someone does an insert) we should be able to implement this with some housekeeping done by the plugins.

One question about EventStore, in the case of reading from multiple replication groups is the ordering based simply on write-timestramp not-descending order?
The timestamp is obviously skewed a bit (multiple servers/clocks do writes) but in the apps you work with would this be ok as source of ordering in case of the “all events” stream?


PS: Most of the team is on holiday this week, it’s reasonable to expect they’ll chime in some time next week.

Konrad Malawski

unread,
Jul 28, 2014, 5:40:41 PM7/28/14
to akka...@googlegroups.com, ahjoha...@gmail.com
Rephrasing my ordering question actually (it started out as something else and ended up as weirdly worded):
I was thinking if the guarantees should be "time in system" or happens before as known by sequence numbers in concreten ids's (A-1 before A-2, but B-1 before B-2, but I don't care about A and B relation).
Curious about your real world use cases in other words.
Less caring about ordering makes way for faster replays of course - so that's what I'm after here (perhaps thinking to far ahead though).

-- k

Martin Krasser

unread,
Jul 29, 2014, 2:25:50 AM7/29/14
to akka...@googlegroups.com

On 28.07.14 23:40, Konrad Malawski wrote:
Rephrasing my ordering question actually (it started out as something else and ended up as weirdly worded):
I was thinking if the guarantees should be "time in system" or happens before as known by sequence numbers in concreten ids's (A-1 before A-2, but B-1 before B-2, but I don't care about A and B relation).

- a total order per persistenceId based on sequence numbers (= partial ordering in the "all events" stream) is a must have IMO.
- ordering based on timestamps should be an application level concern (= timestamps in application-defined events and (re-)ordering done by application)
- mid/long-term goal: causal ordering (allows moving from eventual consistency to causal consistency). See also Don't Settle For Eventual Consistency.

Curious about your real world use cases in other words.
Less caring about ordering makes way for faster replays of course - so that's what I'm after here (perhaps thinking to far ahead though).

-- k

W dniu poniedziałek, 28 lipca 2014 22:49:00 UTC+2 użytkownik Konrad Malawski napisał:
Hi everyone,
thanks for your feedback and ideas.

So the stream / view on multiple persistentIds (or “tags” - would solve Greg’s example case) is coming, we just have not yet have had the time to work on it.
One thing that ties in into them is reactive streams. We would like to expose these event streams as akka streams.
Esp. since they provide they provide things like merge / filter / tee which I believe would help a lot in these kinds of event streams :-)

From the streams point of view abstracting if it’s polling or DB-side initiated events the APIs won’t have to change.
I do agree / like Martin’s suggestion that in “normal dbs” (no events when someone does an insert) we should be able to implement this with some housekeeping done by the plugins.

One question about EventStore, in the case of reading from multiple replication groups is the ordering based simply on write-timestramp not-descending order?
The timestamp is obviously skewed a bit (multiple servers/clocks do writes) but in the apps you work with would this be ok as source of ordering in case of the “all events” stream?


PS: Most of the team is on holiday this week, it’s reasonable to expect they’ll chime in some time next week.

-- 

Konrad 'ktoso' Malawski
hAkker @ typesafe
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Konrad 'ktoso' Malawski

unread,
Jul 29, 2014, 4:47:07 AM7/29/14
to akka...@googlegroups.com, Martin Krasser

- a total order per persistenceId based on sequence numbers (= partial ordering in the "all events" stream) is a must have IMO.
- ordering based on timestamps should be an application level concern (= timestamps in application-defined events and (re-)ordering done by application)

Agreed, seqNrs are at our core and we’ll stick to them (partial ordering will be in for sure, was thinking if more was needed by app implementors). Bringing in timestamps “in some case” would be inconsistent with the rest of persistence anyway, so pushing it into user land sounds good. 

Actually, since we want to expose this as reactive streams the timestamp ordering could be expressed as `merge(streams, userLandProvidedOrdering)` (we don’t have this yet)… Tempting idea, looking forward to trying out different things there.


- mid/long-term goal: causal ordering (allows moving from eventual consistency to causal consistency). See also Don't Settle For Eventual Consistency.
Thanks! Have not read that one yet - looks very interesting. 
Will catch up with it today :-)

— k

Patrik Nordwall

unread,
Aug 5, 2014, 9:30:43 AM8/5/14
to akka...@googlegroups.com
Hi all,

I fully agree that it is a valid feature.

Would the following sketch work?
- Keep PersistentView poll based, but let it query for events with a specified "tag" instead of a single persistenceId. This requires a new query in journal api.
- Add optional tags parameter to persist and persistAsync, i.e. when storing an event it can be marked with zero or more tags. This requires adding tags field to PersistentRepr.
- Streams on top of PersistentView

The new view query would have to include a Map of persistenceId -> seqNr. The journal actor is supposed to be local, so the size of this Map should not be a problem.

Is it overkill to tag individual events? Should the tags be per PersistentActor instance instead?

As a second step (or immediately?), we could let the journal push events to the view. Most journal implementations will have to implement this with polling anyway, but then we make it possible for a journal to take advantage of native push from the store.
The view would have to request number of events from the journal to handle backpressure (similar to reactive streams).

Regards,
Patrik



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Ashley Aitken

unread,
Aug 5, 2014, 10:59:24 AM8/5/14
to akka...@googlegroups.com

My use-case is for denormalised PersistentViews (possibly stored in a Document Store).

Will it be possible for a persistent view to follow all persistent actors of a specific type?  For example, CustomerViewManager to follow all events for all Customers so that when an event comes in for a Customer it can rehydrate (if needed) that CustomerView so that it can be updated?

Will it be possible for a persistent view to follow a set of persistent actors?  For example, a denormalised CustomerView following events for the set of songs the customer has purchased so that if details for one of them changes the denormalised view can be updated?

I can't follow the logic for tagging specific events rather than using types of events or how using streams can scale for any of these sort of requirements. I'm thinking of using Kafka for the journal and having Actors that subscribe to topics to do some of the above (is that plausible?).

Finally, I am all for anything that makes views more push-based (even if it is implemented through polling in some journals) because I believe this is central to a system being reactive.  Standard disclaimers apply (re my lack of knowledge and understanding ;-).

Cheers,
Ashley.

PS I enjoyed the paper on causal consistency.

ahjohannessen

unread,
Aug 5, 2014, 11:18:36 AM8/5/14
to akka...@googlegroups.com
Hi Patrik,

I think it is enough to use same "tag" for all events of a particular type of persistent actor instead of allowing different tags per event. What is important is that it is possible to track the logical position (offset) for all events with the same "tag".

Patrik Nordwall

unread,
Aug 5, 2014, 12:57:10 PM8/5/14
to akka...@googlegroups.com



> On Aug 5, 2014, at 17:18, ahjohannessen <ahjoha...@gmail.com> wrote:
>
> Hi Patrik,
>
> I think it is enough to use same "tag" for all events of a particular type of persistent actor instead of allowing different tags per event.
Ok, noted
> What is important is that it is possible to track the logical position (offset) for all events with the same "tag".
I'm not sure I understand this requirement. Do you ask for full ordering among all events with the same tag? That is not possible for scalability reasons.

Time stamps, logical clocks, or such, have to be added to your own event data.

The events will be in order per persistenceId (and there is a sequence number per persistenceId).

/Patrik

ahjohannessen

unread,
Aug 5, 2014, 1:33:55 PM8/5/14
to akka...@googlegroups.com
"I'm not sure I understand this requirement. Do you ask for full ordering among all events with the same tag? That is not possible for scalability reasons."

So, you do not think it is possible for a journal to maintain a logical position per topic / tag ?

"Time stamps, logical clocks, or such, have to be added to your own event data."

That requirement is exactly what makes akka persistence painful, almost useless, in a DDD / ES / CQRS setup, because read side / catchup-subscriptions currently require to maintain snr per persistentId.

"The events will be in order per persistenceId (and there is a sequence number per persistenceId)."

That I am aware of and that is not enough in a DDD/ES/CQRS setup.

I see no value in tags without ability to track offset pr topic / tag.

Patrik Nordwall

unread,
Aug 5, 2014, 3:11:34 PM8/5/14
to akka...@googlegroups.com


> On Aug 5, 2014, at 19:33, ahjohannessen <ahjoha...@gmail.com> wrote:
>
> "I'm not sure I understand this requirement. Do you ask for full ordering among all events with the same tag? That is not possible for scalability reasons."
>
> So, you do not think it is possible for a journal to maintain a logical position per topic / tag ?

How do you define the order? Is it based on time stamps in the persistent actors? Is it based on some feature in the backend store?

/Patrik
>
> "Time stamps, logical clocks, or such, have to be added to your own event data."
>
> That requirement is exactly what makes akka persistence painful, almost useless, in a DDD / ES / CQRS setup, because read side / catchup-subscriptions currently require to maintain snr per persistentId.
>
> "The events will be in order per persistenceId (and there is a sequence number per persistenceId)."
>
> That I am aware of and that is not enough in a DDD/ES/CQRS setup.
>
> I see no value in tags without ability to track offset pr topic / tag.
>

ahjohannessen

unread,
Aug 5, 2014, 3:25:13 PM8/5/14
to akka...@googlegroups.com
"How do you define the order? Is it based on time stamps in the persistent actors? Is it based on some feature in the backend store?"

I do not think time stamp precision is important for this, I would imagine a logical position / offset as EventStore/Kafka do. I imagine those are based on integers / longs.

I think it depends on the journal, something simple like leveldb would need help from journal, whereas something like kafka / eventstore would probably have something that one could adapt and get easier implemented.

Martin Krasser

unread,
Aug 6, 2014, 4:08:14 AM8/6/14
to akka...@googlegroups.com

On 05.08.14 21:25, ahjohannessen wrote:
> "How do you define the order? Is it based on time stamps in the persistent actors? Is it based on some feature in the backend store?"
>
> I do not think time stamp precision is important for this, I would imagine a logical position / offset as EventStore/Kafka do.

Kafka maintains an offset for each partition separately and a partition
is bound to a single node (disregarding replication). For example, if a
Kafka topic is configured to have 2 partitions, each partition starts
with offset=0, and, if you consume from that topic you only obtain a
partially ordered stream because Kafka doesn't define any ordering
across partitions (see Kafka docs for details). This situation is
comparable to other distributed datastores. For example, Cassandra only
maintains an ordering for entries with the same partition key (i.e. for
entries that reside on the same node).

In general, if you want to maintain an ordering of entries, you either
have to use

- a single writer in the whole cluster (which is the case for persistent
actors) or
- keep entries (that are generated by multiple producers) on a single
node so that the server is able to maintain a local counter (which is
what Kafka does with offsets for each partition separately)

Both limits scalability (as already mentioned by Patrik) for both write
throughput and data volume. It may well be that some applications are
fine with these limitations and benefit from a total ordering of entries
per "tag" but this should not be the default in akka-persistence. IMO,
it would make sense if akka-persistence allows applications to configure
an optional ordering per "tag" so that users can decide to sacrifice
scalability if total ordering is needed for a given tag (and it is up to
journal implementations how to implement that ordering).

As already mentioned in a previous post, causal ordering could be a
later extension to akka-persistence that goes beyond the limits of a
single writer or co-located storage *and* allows for better scalability.
I wish I had more time for hacking on a prototype that tracks causalities :)

> I imagine those are based on integers / longs.
>
> I think it depends on the journal, something simple like leveldb would need help from journal, whereas something like kafka / eventstore would probably have something that one could adapt and get easier implemented.
>

ahjohannessen

unread,
Aug 7, 2014, 2:01:26 PM8/7/14
to akka...@googlegroups.com
Interesting idea, especially since not all of us think in terms of scalability.
I agree with your opinion wrt optional ordering per tag. It would be nice to
find a middle-ground that made sense wrt scalability *and* the need of
being able to use offset per group of persistent actors of same type.
 
As already mentioned in a previous post, causal ordering could be a 
later extension to akka-persistence that goes beyond the limits of a 
single writer or co-located storage *and* allows for better scalability. 
I wish I had more time for hacking on a prototype that tracks causalities :) 

That seems to be a great solution, but something tells me that this is 
not going to happen soon.

So, my conclusion is that I will go with using a single persistent actor as a journal, 
thus getting a "tag" + seqNr, and use persistent views to what I otherwise would 
use a normal persistent actor for, e.g. as in this sketch: 
and use snapshotting to reduce recovery time of persistent views.

Patrik, on a second thought, perhaps tags per event would not be that silly, at
least it gives more flexibility. However, I suppose it all depends on what you guys
want to use a "tag" for.

Vaughn Vernon

unread,
Aug 7, 2014, 2:34:15 PM8/7/14
to akka...@googlegroups.com
Can someone (Martin?) please post some rough performance and scalability numbers per backing storage type? I see these DDD/ES/CQRS discussions lead to consumer-developer limitations based on performance and scalability, but I have not seen any actual numbers. So please post numbers in events per-second as I would prefer not trying to hunt down such numbers in old posts.

I keep saying this, but it seems without much success, but akka-persistence, even at its slowest, probably still performs 5x-10x better than most relational stores, and at its best perhaps 500x better. I often poll my IDDD Workhop students for realistic transactions per-second numbers and most of the time it is at or below 100 tps. Some of the higher ones are around 1000 tps. A few students identify with 10,000 tps. As far as I know, akka-persistence can regularly perform in the 30,000 tps. With some store tops out at, what, 500,000 tps? The point I am trying to make is, even if you put a singleton sequence in front of your slowest possible store, let's assume that it could cost 30%. That would still leave performance at 20,000 tps on your slowest store, which is 20x faster than many, many enterprise applications. (There are faster ways of producing incremental sequences than using a singleton atomic long.)

I vote that you need to have a single sequence across all events in an event store. This is going to cover probably 99% of all actor persistence needs and it is going to make using akka-persistence way easier.

A suggestion: rather than looking so carefully at akka-persistence for performance and scalability increases, I think a lot could be gained by looking at false sharing analysis and padding solutions.

Vaughn

ahjohannessen

unread,
Aug 7, 2014, 2:57:10 PM8/7/14
to akka...@googlegroups.com
On Thursday, August 7, 2014 7:34:15 PM UTC+1, Vaughn Vernon wrote:

I vote that you need to have a single sequence across all events in an event store. This is going to cover probably 99% of all actor persistence needs and it is going to make using akka-persistence way easier.

If that was made optional + tag facility, then those that see it hurts scalability would opt-out and others would opt-in and pay the extra penalty.

Patrik Nordwall

unread,
Aug 7, 2014, 3:29:33 PM8/7/14
to akka...@googlegroups.com
Ok, I think it's a good idea to leave it to the journal plugins to implement the full ordering as good as is possible with the specific data store. We will only require exact order of events per persistenceId.

Any other feedback on the requirements or proposed solution of the improved PersistentView?

/Patrik

Vaughn Vernon

unread,
Aug 7, 2014, 6:21:14 PM8/7/14
to akka...@googlegroups.com
I am sure you have already thought of this, Patrik, but if you leave full ordering to the store implementation, it could still have unnecessary limitations if the implementor chooses to support sequence only for persistenceId. One very big limitation is, if the store doesn't support single sequence you still can't play catch-up over the entire store if you are dependent on interleaved events across types. You can only re-play all events properly if using a global sequence. Well, you could also do so using casual consistency, but (a) that's kinda difficult, and (b) it's not supported at this time.

Vaughn

Gary Malouf

unread,
Aug 7, 2014, 8:53:14 PM8/7/14
to akka...@googlegroups.com
I don't see it mentioned on this particular thread, but I feel creating reliable sagas across processors (Aggregates) is a real challenge right now as well.  Having a clearly documented way to do this is critical IMO to creating a more complex and reliable CQRS-based apps.

Patrik Nordwall

unread,
Aug 8, 2014, 4:45:30 AM8/8/14
to akka...@googlegroups.com
On Fri, Aug 8, 2014 at 12:21 AM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
I am sure you have already thought of this, Patrik, but if you leave full ordering to the store implementation, it could still have unnecessary limitations if the implementor chooses to support sequence only for persistenceId.

As a user you would have to pick a journal that supports your needs in this regard.

/Patrik
 

Ashley Aitken

unread,
Aug 8, 2014, 6:00:13 AM8/8/14
to akka...@googlegroups.com

On Friday, 8 August 2014 08:53:14 UTC+8, Gary Malouf wrote:
I don't see it mentioned on this particular thread, but I feel creating reliable sagas across processors (Aggregates) is a real challenge right now as well.  Having a clearly documented way to do this is critical IMO to creating a more complex and reliable CQRS-based apps.

+1  I have mentioned this before in another thread.  IMHO, CQRS depends heavily on an event queue for the read model, sagas and for integration with other systems, which subscribe to the events etc.  A journal just for processor persistence is not enough, again in my opinion.   

This is why I am excited about Kafka.  It seems to be able to be a journal (with some restrictions or configured to maintain events forever) and an event queue that other processes and systems can subscribe to.  

Ashley Aitken

unread,
Aug 8, 2014, 6:10:42 AM8/8/14
to akka...@googlegroups.com


On Friday, 8 August 2014 16:45:30 UTC+8, Patrik Nordwall wrote:

On Fri, Aug 8, 2014 at 12:21 AM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
I am sure you have already thought of this, Patrik, but if you leave full ordering to the store implementation, it could still have unnecessary limitations if the implementor chooses to support sequence only for persistenceId.

As a user you would have to pick a journal that supports your needs in this regard.

I agree with you both.  With Vaughn I agree that we need a global sequence (although I understand this is very impractical within distributed systems) and with Patrik that it should be up to the store implementation (with the possibility of store configuration determining this).  It would be up to the store (and the developer's choice in configuring that store) to determine how close to causal or total ordering the sequence will be.

So for example, with general use of Kafka the store could provide events from each partition for a topic (if I understand correctly how Kafka works) in a round-robin fashion, which wouldn't be properly sequenced, but it may be manageable for some requirements.  If a developer wanted more strict "global" sequencing then they could configure the store to have a single partition,with the scaling implications that would have. 


Gary Malouf

unread,
Aug 8, 2014, 7:15:28 AM8/8/14
to akka...@googlegroups.com
One of the arguments for CQRS/Event Sourcing combo has been that it allows you to optimize reads and writes independently for high throughput.  For many people however (including us) we want the command/query separation + the sequence of events for just the design benefits.  Sagas are one of the critical pieces of this, but there need to be guarantees that if one event occurs out of one aggregate/processor + 3 other aggregates/processors are listening for it, they will get it barring a catastrophe.

Unless one simply polls all of the processor persistent views manually today, this guarantee just is not there out of the box.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Prakhyat Mallikarjun

unread,
Aug 10, 2014, 9:20:21 AM8/10/14
to akka...@googlegroups.com
How the performance of event sourcing will be impacted in a highly oltp application which generates million of events?

There is a big overhead to store these events and also state of system in durable db's for read or other purposes. Also don't you think event sourcing apps will consume huge disk spaces and also it consume space faster wrt highly oltp application.

Prakhyat Mallikarjun

unread,
Aug 10, 2014, 10:29:02 AM8/10/14
to akka...@googlegroups.com
Hi Gary/akka team,

I have requirement in my app that changes to one aggregate root affects many more aggregate roots and all have to be in sync. I keep seeing in discussions name of sagas being refered. Will really sagas help to resolve this condition? Can I find any articles in this regard?

Are there any other design approachs?

Gary Malouf

unread,
Aug 10, 2014, 11:36:13 AM8/10/14
to akka...@googlegroups.com
Hi Prakhyat,

We are building a CQRS/DDD-oriented configuration system based on akka persistence and are running into the same modeling issues.  A few characteristics of our specific case:

1) We do not expect a high volume of commands to be submitted (they are generated via a task-based user interface that will have on the order of 30-50 users).

2) We have a number of cases where the output events of one aggregate must eventually trigger a change on another aggregate.  This use case is what I am referring to as 'sagas'.  There are two concerns that need to be addressed: guarantee that the messages will eventually get delivered in the event of system error/failure and the ability of the receiving aggregates to be able to order/handle them.

3) We use the cassandra connector for akka persistence with a 'quorum' consistency level for writing and reading.


Since we are not dealing with high throughputs, a less performant but a safer solution to addressing the concerns in (2) are possible for us without introducing another system to an already complicated infrastructure.  We can have the aggregates that may receive events from others reliably query the views for the aggregates they depend on (reading from Cassandra) directly to ensure messages are not missed and come in order.  

In our view, putting the weight on the consumer to deal with out of order messaging was painful for us.  I've read the blogs arguing for being able to deal with this, but it just felt like something the framework should handle for you in the end.

The reliable, in-order messaging concern also extends to 'stream consumers' in general.  For this, we are looking at building a service that reads from all views (ordering across processors/aggregates by timestamp), assigns a 'global' sequence number to the event, and persists this in a stream.  We then can have our consumers read from this stream with confidence that events will arrive in order and not be missing.  That service could run as a singleton in an akka cluster for reliability - performance is not a concern for us at our expected traffic.

Both of the cases, highlight the need to have a reliable messaging integration to avoid the hoops we will be jumping through.



ahjohannessen

unread,
Aug 10, 2014, 11:37:23 AM8/10/14
to akka...@googlegroups.com
Prakhyat, please stay on topic or start a new thread.

Thanks.

Prakhyat

unread,
Aug 10, 2014, 12:20:25 PM8/10/14
to akka...@googlegroups.com
Hi,

I understand.

But take my point of view...,,
thread had point discussed on sagas. I felt it's the perfect point to pitch in my query as context was set.

If you have answer to my query please forward me to the thread.

Sent from my iPhone
--

Prakhyat

unread,
Aug 10, 2014, 12:36:10 PM8/10/14
to akka...@googlegroups.com
Hi Garry,

Thanks for your points.

I am also working on solving this. Thinking of doing this via synchronous communication between aggregate roots. 

Is there a implementation of your idea and you are planning to put on GitHub? Please share the details on thread.

Thanks a lot again ...it helped.

Sent from my iPhone

Ashley Aitken

unread,
Aug 10, 2014, 1:20:52 PM8/10/14
to akka...@googlegroups.com

A few things I have noted when re-reading "Exploring CQRS and Event Sourcing" http://msdn.microsoft.com/en-us/library/jj554200.aspx :

A. Events can play two different roles in a CQRS implementation: 1) Event Sourcing - as a.p provides to persist the state of an aggregate root, and 2) Communication and Integration - between bounded contexts and with other systems.

B. Not all events in a CQRS system are related to an Aggregate Root: "Your event store must have a away to store events that are not associated with an aggregate." page 283, so there needs to be a way to inject events independently of AR persistence. 

C. There needs to be enough default information associated with events in the event store to allow (re)construction of various different projections after the events were published, i.e. without requiring a priori "labelling" of events for specific projections.

These points suggests to me that:

1. To do AR persistence Akka can get by with a "hidden journal" but for full CQRS it may also need to provide access to an Event Store for actors (and other entities) to publish events and to filter the whole event stream directly as needed. 

That said, perhaps a PersistentActor could publish events not related to its persistence and just ignore them when rebuilding its state.

2. There needs to be more transparent metadata associated with events, beyond a (possibly optional) persistenceId, that isn't just specified with specific projections in mind.  I suggest the full type of the event would be a good start (to help with the filtering in 1.). 

Perhaps also something to help with partial / causal ordering could be included.




Vaughn Vernon

unread,
Aug 10, 2014, 5:57:05 PM8/10/14
to akka...@googlegroups.com
None of this stuff is easy to do, and even harder to do right. Your post gives away the main problem with getting this to work correctly, because Actor Model and akka-persistence currently supports the first half of A, but not the second half. In other words, to make the interface rich we not only need a new set of abstractions, we also need to overcome the direct messaging nature of actors because it can be limiting in some use cases.

With the messaging library I am building, currently named Number 9, which includes both persistence and a process manager, this problem is handled as follows. Any actor that sends a message may:

1. send a persistent message to another actor
2. send a persistent message to a topic
3. send a persistent message primarily to another actor, but also to a topic

If you have watched any of my presentations on this subject you have heard this before. I am presenting most of this to the DDD Denver meetup this Monday night. The title of the talk is "Building a Reactive Process Manager, Twice". The twice part is because I will demonstrate this working both in Scala with Akka and also in C# with Dotsero:



I am not sure if the presentation will be recorded.

Vaughn

Prakhyat Mallikarjun

unread,
Aug 11, 2014, 3:21:39 AM8/11/14
to akka...@googlegroups.com
Hi Vaughn,

When can we expect "Building a Reactive Process Manager, Twice" implementation working both in Scala with Akka into the github?

Ashley Aitken

unread,
Aug 12, 2014, 12:10:42 PM8/12/14
to akka...@googlegroups.com

Thanks for your post Vaughn.


On Monday, 11 August 2014 05:57:05 UTC+8, Vaughn Vernon wrote:
None of this stuff is easy to do, and even harder to do right.

I am the first to agree with that.
 
Your post gives away the main problem with getting this to work correctly, because Actor Model and akka-persistence currently supports the first half of A, but not the second half. In other words, to make the interface rich we not only need a new set of abstractions, we also need to overcome the direct messaging nature of actors because it can be limiting in some use cases.  
With the messaging library I am building, currently named Number 9, which includes both persistence and a process manager, this problem is handled as follows. Any actor that sends a message may:

1. send a persistent message to another actor
2. send a persistent message to a topic
3. send a persistent message primarily to another actor, but also to a topic

That is very interesting.  

It seems to me that CQRS commands should be sent as messages (persistent or not) - your (1.) and changes of state (AR or application) should be published as events (to topics or more generally) - your (2.) but I can't see a need for (3.)?

Further, a process manager for a bank account transfer could be implemented with a command to the source account (withdrawForTransfer) that would be acknowledged by a published persistent event (WithdrawnForTransfer).  Similar for deposit into target account.

Pawel Kaczor in his DDD-Leaven-Akka series (Lesson 3) includes projections from aggregated streams of events and a process manager / saga using Akka Persistence by having the ARs persisting their events and also publishing their events.



The only shortcomings (not his fault or a criticism) seem to be: 1) the use of two event infrastructures (one for persistence and one for pub/sub), 2) the limited ability for complex projections (like Greg mentioned and available in Event Store), and 3) lack of persistence for pub/sub events.

The latter makes reconstruction of a read model or construction of a new read model after the events have been published more difficult.  
 
If you have watched any of my presentations on this subject you have heard this before. I am presenting most of this to the DDD Denver meetup this Monday night. The title of the talk is "Building a Reactive Process Manager, Twice". The twice part is because I will demonstrate this working both in Scala with Akka and also in C# with Dotsero:

Thank you I will look out for that (please share the video link if it is recorded and put on the Web).  I have seen (but not watched) some of your videos because I am unsure as to who is leading here and the videos I saw seemed to be from a few years ago.  

I've just got your book so I will get on with reading that (for DDD and CQRS enlightenment).

Roland Kuhn

unread,
Aug 15, 2014, 1:39:45 PM8/15/14
to akka-user
Dear hakkers,

unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!

If I may, here’s what I have understood so far:
  1. In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
  2. Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
  3. Constructing Sagas is hard.

AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.

The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work). The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

When it comes to providing queries in a way that does not have a global ordering, my current opinion is that we should not do this because it would be quite pointless (a.k.a. unusable). A compromise would be to provide eventually linearizable queries based on the premise that the application of events should be idempotent in any case and overlapping replay (i.e. where necessary from the last known-linear point instead of the requested one) must be tolerated. AFAIK this is the topic of ongoing research, though, so I’d place that lower on the priority list.

Does this sound like a fair summary? Please let me know in case I misrepresent or misunderstand something, once we reach consensus on what we need we’ll ticket and solve it, as usual ;-)

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Vaughn Vernon

unread,
Aug 16, 2014, 4:11:06 PM8/16/14
to akka...@googlegroups.com
Hi Roland,

Welcome back to the discussions :)

I think you stated most of this very thoroughly. There are probably a few points lost in translation between your more general actor-based or CS terminology and what others would use to define ES/CQRS. Would it be possible for me to ask for just a few clarifications? I will comment inline with yours below.

One other thing, however, from my points 1-3 above regarding sending messages to both single actors and to topics. In actuality the simplest way to think about this is that you have the ability to tell a vector/collection of actors something. This ability makes it easier to think about than the requirements stated in my 1-3:

   val processCollaborator: ActorRef = nextProcessingStep
   . . .
   val interestTopic: ActorRef = viewInterest
   . . .
   val actors: VectorRef = processCollaborator alongWith interestTopic

   actors ! SomethingHappened(...)

Here the vector could tell any number of actors, not just two.

I have actually solved this a bit differently, but perhaps you feel that this aligns a bit better with your way of thinking. (I don't know, but it is based on our conversation from a few weeks ago.) What I have done is add another abstraction named EntityRef, which does not mean DDD entity and in no way forces thinking about DDD Aggregates. It is just an Entity in the sense that Gul Agha would probably use. With that you could also have an EntitiesRef that supports safely telling any number of interested entities about what happened. I think this is very important, because the syntax for communicating retains the same awesome explicit readability of simple ActorRef receiving a tell message:

   val processCollaborator: EntityRef = nextProcessingStep
   . . .
   val interestTopic: EntityRef = viewInterest
   . . .
   val entities: EntitiesRef = processCollaborator alongWith interestTopic

   entities ! SomethingHappened(...)

The main reason for adding EntityRef and EntitiesRef is to tag the underlying actors as reliably receiving a message. It also gives library implementors a specific extension point to do things within the EntityRef/EntitiesRef that you will not permit them to do with ActorRef. (As you and the team have made clear, ActorRef is reserved exclusively for sending with at-most-once delivery semantics.)

My inline comments are below...

Vaughn


On Friday, August 15, 2014 11:39:45 AM UTC-6, rkuhn wrote:
Dear hakkers,

unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!

If I may, here’s what I have understood so far:
  1. In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
  2. Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
  3. Constructing Sagas is hard.

AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.

The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work). The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

Is point one for providing a sequence number from a single ordering source? Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work?  If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer? 
 

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.
 

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

I think these are very typical kinds of queries are:

- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)
- All persisted events that constitute the state of my actor
- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong

 

When it comes to providing queries in a way that does not have a global ordering, my current opinion is that we should not do this because it would be quite pointless (a.k.a. unusable). A compromise would be to provide eventually linearizable queries based on the premise that the application of events should be idempotent in any case and overlapping replay (i.e. where necessary from the last known-linear point instead of the requested one) must be tolerated. AFAIK this is the topic of ongoing research, though, so I’d place that lower on the priority list.

Are you here referring to Casual Consistency? Otherwise, I am not sure I follow. If you refer to Casual Consistency, I agree that this should be lower on the to-support priority list than global ordering, because it is just too hard compared to the needs of most teams that want to use ES/CQRS.

Patrik Nordwall

unread,
Aug 18, 2014, 4:27:13 AM8/18/14
to akka...@googlegroups.com
Hi Roland,

A few more questions for clarification...


Does that mean that a PersistentActor can emit events targeted to its persistenceId and/or targeted to an external topic and it is only the events targeted to the persistenceId that will be replayed during recovery of that PersistentActor?

Both these two types of events can be replayed by a PersistentView.
 
The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

Replica as in data store replica, or as in journal actor? 
 

Is point one for providing a sequence number from a single ordering source?

Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)
 
Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work?  If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer? 
 

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.

I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not. 
 
 

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

I think these are very typical kinds of queries are:

- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)
- All persisted events that constitute the state of my actor
- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong

We have still not really defined what this *order* is.

"in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.

Or do we mean the insert order in the data store? There is often no such thing in a distributed store.

Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?


I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreams

It is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.

Regards,
Patrik

Roland Kuhn

unread,
Aug 18, 2014, 9:38:36 AM8/18/14
to akka-user
18 aug 2014 kl. 10:27 skrev Patrik Nordwall <patrik....@gmail.com>:

Hi Roland,

A few more questions for clarification...


On Sat, Aug 16, 2014 at 10:11 PM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
Hi Roland,

Welcome back to the discussions :)

I think you stated most of this very thoroughly. There are probably a few points lost in translation between your more general actor-based or CS terminology and what others would use to define ES/CQRS. Would it be possible for me to ask for just a few clarifications? I will comment inline with yours below.

One other thing, however, from my points 1-3 above regarding sending messages to both single actors and to topics. In actuality the simplest way to think about this is that you have the ability to tell a vector/collection of actors something. This ability makes it easier to think about than the requirements stated in my 1-3:

   val processCollaborator: ActorRef = nextProcessingStep
   . . .
   val interestTopic: ActorRef = viewInterest
   . . .
   val actors: VectorRef = processCollaborator alongWith interestTopic

   actors ! SomethingHappened(...)

Here the vector could tell any number of actors, not just two.

I have actually solved this a bit differently, but perhaps you feel that this aligns a bit better with your way of thinking. (I don't know, but it is based on our conversation from a few weeks ago.) What I have done is add another abstraction named EntityRef, which does not mean DDD entity and in no way forces thinking about DDD Aggregates. It is just an Entity in the sense that Gul Agha would probably use. With that you could also have an EntitiesRef that supports safely telling any number of interested entities about what happened. I think this is very important, because the syntax for communicating retains the same awesome explicit readability of simple ActorRef receiving a tell message:

   val processCollaborator: EntityRef = nextProcessingStep
   . . .
   val interestTopic: EntityRef = viewInterest
   . . .
   val entities: EntitiesRef = processCollaborator alongWith interestTopic

   entities ! SomethingHappened(...)

The main reason for adding EntityRef and EntitiesRef is to tag the underlying actors as reliably receiving a message. It also gives library implementors a specific extension point to do things within the EntityRef/EntitiesRef that you will not permit them to do with ActorRef. (As you and the team have made clear, ActorRef is reserved exclusively for sending with at-most-once delivery semantics.)

Telling a collection of recipients something can have two different semantics:
  • everyone gets or loses it individually; this is already possible using `actors foreach (_ ! msg)`
  • everyone gets or loses it as a group
The second one needs a bit more thought due to its transactional nature, and your use-case of reliably sending to a group also implies persistence. Both constraints are straight-forward to solve by sending into a single persistent “channel” that all recipients read from, which directly relates to the discussion about topics (more below). OTOH the transactional constraint is not solvable without collaboration from the recipients, and the persistence aspect plays into this as well: the sender will need to commit the message to storage atomically and the recipients will need to retrieve it reliably from there (which is another way to look at resends and acknowledgements: the recipient asks for the data by not sending an ACK). In any case, this collaboration between sender and recipient does not match up with the normal semantics of the `tell` operator, hence it would be confusing to expose such syntax to users.


My inline comments are below...

Vaughn


On Friday, August 15, 2014 11:39:45 AM UTC-6, rkuhn wrote:
Dear hakkers,

unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!

If I may, here’s what I have understood so far:
  1. In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
  2. Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
  3. Constructing Sagas is hard.

AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.

The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work).

Does that mean that a PersistentActor can emit events targeted to its persistenceId and/or targeted to an external topic and it is only the events targeted to the persistenceId that will be replayed during recovery of that PersistentActor?

Yes.

Both these two types of events can be replayed by a PersistentView.

Yes; they are not different types of events, just how they get to the Journal is slightly different.

 
The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

Replica as in data store replica, or as in journal actor? 

The Journal must implement this in whatever way is suitable for the back-end. A generic solution would be to shard the topics as Actors across the cluster (internal to the Journal), or the Journal could talk to the replicated back-end store such that a topic always is written to one specific node (if that helps).

 

Is point one for providing a sequence number from a single ordering source?

Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)
 
Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work?  If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer? 

#1 is not about sequence numbers per se (although it has consequences of that kind): it is only about allowing persistenceIds that are not bound to a single PersistentActor and that all PersistentActors can publish to. Mock code:

def apply(evt: Event) = state = evt(state)

def receiveCommand = {
  case c: Cmd =>
    if (isValid(c)) {
      persist(Event1(c))(apply)
      persistToTopic("myTopic", Event2(c)) { evt =>
        apply(evt)
        sender() ! Done
      }
    }
}

Everyone who listens to "myTopic" will then (eventually) get Event2.

 

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.

I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not. 

My point (perhaps not well articulated) was that in order to offer the feature of listening to arbitrary Queries the Journal MUST provide the resulting event stream in a consistent order (see more below). Providing them in random order for each replay is worse than useless as far as I can see.

 
 

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

I think these are very typical kinds of queries are:

- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)

This needs a total order, and it must be consistent across runs (i.e. eventual consistency is not good enough, otherwise you will lose events or double-process them).

- All persisted events that constitute the state of my actor

This is defined to be the Actor’s own topic, hence it is not a Query (and therefore not a Problem ;-) ).

- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong

The order that they originally happened in is ill-defined unless your Journal globally serializes events—which is not a given.

- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong

Same here. But it is difficult to imagine things going wrong between unrelated (concurrent & distributed) entities, their order did not matter the first time around as well, right? An exception here is causal ordering (BTW: casual ordering is not known to me).


We have still not really defined what this *order* is.

"in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.

Or do we mean the insert order in the data store? There is often no such thing in a distributed store.

Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?

This is the only one we can aim for AFAICS. As usual, I’d love to be proven wrong.

I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreams

It is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.

What Greg describes there is exactly what I mean with Eventually Linearizable: while things are happening it takes a while for the system to sync up and agree on a replay order. Once that is set, everything is deterministic.

The alternative would be to strictly obey causal order by tracking who sent what in response to which message. Causality only defines a partial order, but that should by definition be enough because those event pairs which are unordered also do not care about each other (i.e. the global ordering might be different during each replay but that is irrelevant since nobody can observe it anyway).

Regards,

Roland

Patrik Nordwall

unread,
Aug 18, 2014, 10:50:31 AM8/18/14
to akka...@googlegroups.com
What has been requested is "all events for an Aggregate type", e.g. all shopping carts, and this will will not scale. It can still be useful, and with some careful design you could partition things when scalability is needed. I'm just saying that it is a big gun, that can be pointed in the wrong direction.

 

 

Is point one for providing a sequence number from a single ordering source?

Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)
 
Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work?  If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer? 

#1 is not about sequence numbers per se (although it has consequences of that kind): it is only about allowing persistenceIds that are not bound to a single PersistentActor and that all PersistentActors can publish to. Mock code:

def apply(evt: Event) = state = evt(state)

def receiveCommand = {
  case c: Cmd =>
    if (isValid(c)) {
      persist(Event1(c))(apply)
      persistToTopic("myTopic", Event2(c)) { evt =>
        apply(evt)
        sender() ! Done
      }
    }
}


Looks good, but to make it clear, there is no transaction that spans over these two persist calls.
 
Everyone who listens to "myTopic" will then (eventually) get Event2.

 

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.

I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not. 

My point (perhaps not well articulated) was that in order to offer the feature of listening to arbitrary Queries the Journal MUST provide the resulting event stream in a consistent order (see more below). Providing them in random order for each replay is worse than useless as far as I can see.

 
 

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

I think these are very typical kinds of queries are:

- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)

This needs a total order, and it must be consistent across runs (i.e. eventual consistency is not good enough, otherwise you will lose events or double-process them).

- All persisted events that constitute the state of my actor

This is defined to be the Actor’s own topic, hence it is not a Query (and therefore not a Problem ;-) ).

- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong

The order that they originally happened in is ill-defined unless your Journal globally serializes events—which is not a given.

- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong

Same here. But it is difficult to imagine things going wrong between unrelated (concurrent & distributed) entities, their order did not matter the first time around as well, right? An exception here is causal ordering (BTW: casual ordering is not known to me).


We have still not really defined what this *order* is.

"in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.

Or do we mean the insert order in the data store? There is often no such thing in a distributed store.

Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?

This is the only one we can aim for AFAICS.

I agree, but that is impossible to achieve with a fully available system (AFAIK).
 
As usual, I’d love to be proven wrong.

I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreams

It is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.

What Greg describes there is exactly what I mean with Eventually Linearizable: while things are happening it takes a while for the system to sync up and agree on a replay order. Once that is set, everything is deterministic.

Yes, the problem is that a PersistentView is querying live data, and if that "replay" order is supposed to be the same as a later historical query the data must be stored in total order, or some kind of buffering/sorting best effort must be used.
 

The alternative would be to strictly obey causal order by tracking who sent what in response to which message. Causality only defines a partial order, but that should by definition be enough because those event pairs which are unordered also do not care about each other (i.e. the global ordering might be different during each replay but that is irrelevant since nobody can observe it anyway).

I think this ties in to what is confusing me most about all this. The only consistency boundary in my world is the DDD Aggregate instance, i.e. one PersistentActor instance. What has been requested is something that requires consistency (ordering) across Aggregate instances.

Why not model the topic as a separate PersistentActor? Problems with that has been raised, such as how to reliably deliver the events from the Aggregate PersistentActor to the topic PersistentActor. Is that impossible to solve?

Don't we have a similar problem with the two calls to persist and persistToTopic. They are not to atomic.

/Patrik

Ashley Aitken

unread,
Aug 18, 2014, 12:01:36 PM8/18/14
to akka...@googlegroups.com

Hi Roland (and everyone),


Welcome back Roland - I hope you had a great vacation.


Thank you for your post.  



Here’s my response summary:


I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,


(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since an actor can just ignore them on replay, 


(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and


(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.


I believe each of these is readily achievable with Akka but:


(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).


(iii) with (i) would enable the read side (if the store supports it) to read all events from a particular actor(s), of particular event types, to read events from a particular type(s) of actors, and to read all events.  It would also need to allow the read side to read from where it last finished reading, from now, and from the start again.  (iv) is necessary for projections.  



If you are interested, here’s my detailed explanation:


I think some of the confusion surrounding these issues is caused by the fact that we seem to be discussing and, if I may suggest, Akka appears to be trying to implement three quite different (but also somewhat related) pieces of functionality within this domain.  These are:


A. Actor Persistence


The ability to persist actor state changes incrementally (or wholly) and reconstruct that state at a later time, which we know as event sourcing.  I think Akka provides a great distributed and scalable mechanism for doing this with the current akka.persistence.


B. Publish/Subscribe to Persistent Queues/Topics


This functionality would allow actors to write data/events/messages to one (or more) topics and to subscribe to receive similar from one or more topics.  These differ from normal publish/subscribe queues in that they are persistent and the consumer can reread from the topic.


This is what I think of as the LogProducer and LogConsumer, of which PersistentActor and PersistentView can be thought of as specialisations, i.e. a single topic for each actor.  The current and popular example of a store for this sort of functionality, as you know, is Kafka. 


C. CQRS with Event Sourcing


And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores.  So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.


Please consider this diagram overviewing CQRS with Event Sourcing:


<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>


adapted from 


<http://www.gridshore.nl/wp-content/uploads/cqrs_architecture.jpg>


As I understand it, CQRS separates the write model and store from one or *more* read models and stores, with each model and store being optimised for their particular role.  CQRS says nothing specific about the types of store (e.g. SQL or NOSQL, event sourced or not) and how consistency is achieved.


As you know, when using event sourcing the changes to the write model entities (e.g. Aggregate Roots) are stored as events and the write model is reconstructed by replaying those events.  This is (A) above and what akka.persistence has achieved very well in a distributed and scalable way.  


This is the dashed area labelled [1] in the diagram.


Further, CQRS uses commands to initiate changes to the write model and signals theses changes with events (whether the events are used for event sourcing or not).  These events are what allows sagas and other systems to track changes and respond to changes in the write model.  


This is the dashed area labelled [2] in the diagram.


For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done.  The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.


This event notification system is conceptually independent of how the read store(s) will eventually become consistent (i.e. there are other means of achieving this consistency without events).  However, it just so happens that these events are also a great way to notify the read store(s) of changes.


So conceptually, the read side receives all the events from the write side independent of the write side event store. I believe we should logically think of this as one big stream of events since the write side doesn’t (and cannot) know what the read store(s) needs from these events in advance.  


This is the dashed area labelled [3] in the diagram.


Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea.  But I don’t believe this is enough for CQRS.  One event store cannot provide all the required read models.


Because the read side *must* support a number of different stores (not just the one used for actor persistence and to distribute the events to the read side) it is also necessary for there to be some reliable way to get event information into the other read store(s).


For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time.  As you mentioned, this needs to be idempotent to handle at least once delivery. 



I hope that Akka can really fully support CQRS with Event Sourcing to become the premiere platform for its use in highly scalable, reliable and efficient systems.  I also hope this explanation can help that happen sooner rather than later since I think it is a great opportunity for Akka.  


Please anyone, correct me if I am misunderstanding things or have left anything out. 


Cheers,

Ashley.

Ashley Aitken

unread,
Aug 18, 2014, 12:15:42 PM8/18/14
to akka...@googlegroups.com

Sorry about that formatting, something happened (with the EOLs) when I pasted from a text editor.

Ashley Aitken

unread,
Aug 18, 2014, 12:30:46 PM8/18/14
to akka...@googlegroups.com

When information is persisted with the event (e.g. actorId, EventType, ActorType), the event store / plugin can determine how it provides functionality for the read side.  For example, the Kafka plugin could take each event and post it to three actual separate topics.  On the other hand, something like EventStore and its plugin could persist just the event and related information and later construct a synthetic topic(s) if requested by the read side for particular projections.

Greg Young

unread,
Aug 19, 2014, 1:18:35 AM8/19/14
to akka...@googlegroups.com
everything you list here is available today via akka.persistence + event store adapter & a durable subscription (akka event store client) on the read model side.

Ashley Aitken

unread,
Aug 19, 2014, 4:28:05 AM8/19/14
to akka...@googlegroups.com

Thank you Greg, I hadn't thought of the Event Store JVM Client for the read model <https://github.com/EventStore/EventStore.JVM>

So I assume one would generally have a ConnectionActor for each custom event stream that is required to keep a particular query store up-to-date on the read side and these could be in the same or different applications?

It would be nice if PersistentView could just specify an identifier for a custom event stream (e.g. from Event Store) and process those events appropriately (start after previous last event, restart as needed, etc.)

Also, I still can't see a solution for sagas that can maintain their state over crashes, e.g. as a PersistentActor, but also track or replay events after a particular time from another PersistentActor(s).  But this is on the write side.

Roland Kuhn

unread,
Aug 19, 2014, 8:49:59 AM8/19/14
to akka-user
18 aug 2014 kl. 16:49 skrev Patrik Nordwall <patrik....@gmail.com>:

On Mon, Aug 18, 2014 at 3:38 PM, Roland Kuhn <goo...@rkuhn.info> wrote:

18 aug 2014 kl. 10:27 skrev Patrik Nordwall <patrik....@gmail.com>:

Hi Roland,

A few more questions for clarification...


On Sat, Aug 16, 2014 at 10:11 PM, Vaughn Vernon <vve...@shiftmethod.com> wrote:

On Friday, August 15, 2014 11:39:45 AM UTC-6, rkuhn wrote:
Dear hakkers,

unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!

If I may, here’s what I have understood so far:
  1. In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
  2. Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
  3. Constructing Sagas is hard.

AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.

The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work).

Does that mean that a PersistentActor can emit events targeted to its persistenceId and/or targeted to an external topic and it is only the events targeted to the persistenceId that will be replayed during recovery of that PersistentActor?

Yes.

Both these two types of events can be replayed by a PersistentView.

Yes; they are not different types of events, just how they get to the Journal is slightly different.

 
The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

Replica as in data store replica, or as in journal actor? 

The Journal must implement this in whatever way is suitable for the back-end. A generic solution would be to shard the topics as Actors across the cluster (internal to the Journal), or the Journal could talk to the replicated back-end store such that a topic always is written to one specific node (if that helps).

What has been requested is "all events for an Aggregate type", e.g. all shopping carts, and this will will not scale. It can still be useful, and with some careful design you could partition things when scalability is needed. I'm just saying that it is a big gun, that can be pointed in the wrong direction.

Mixed-up context: #1 is about predefined topics to which events are emitted, not queries. We need to strictly keep these separate.


 

 

Is point one for providing a sequence number from a single ordering source?

Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)
 
Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work?  If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer? 

#1 is not about sequence numbers per se (although it has consequences of that kind): it is only about allowing persistenceIds that are not bound to a single PersistentActor and that all PersistentActors can publish to. Mock code:

def apply(evt: Event) = state = evt(state)

def receiveCommand = {
  case c: Cmd =>
    if (isValid(c)) {
      persist(Event1(c))(apply)
      persistToTopic("myTopic", Event2(c)) { evt =>
        apply(evt)
        sender() ! Done
      }
    }
}


Looks good, but to make it clear, there is no transaction that spans over these two persist calls.

Of course.

 
Everyone who listens to "myTopic" will then (eventually) get Event2.

 

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.

I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not. 

My point (perhaps not well articulated) was that in order to offer the feature of listening to arbitrary Queries the Journal MUST provide the resulting event stream in a consistent order (see more below). Providing them in random order for each replay is worse than useless as far as I can see.

 
 

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

I think these are very typical kinds of queries are:

- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)

This needs a total order, and it must be consistent across runs (i.e. eventual consistency is not good enough, otherwise you will lose events or double-process them).

- All persisted events that constitute the state of my actor

This is defined to be the Actor’s own topic, hence it is not a Query (and therefore not a Problem ;-) ).

- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong

The order that they originally happened in is ill-defined unless your Journal globally serializes events—which is not a given.

- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong

Same here. But it is difficult to imagine things going wrong between unrelated (concurrent & distributed) entities, their order did not matter the first time around as well, right? An exception here is causal ordering (BTW: casual ordering is not known to me).


We have still not really defined what this *order* is.

"in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.

Or do we mean the insert order in the data store? There is often no such thing in a distributed store.

Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?

This is the only one we can aim for AFAICS.

I agree, but that is impossible to achieve with a fully available system (AFAIK).

Yes. Queries cannot scale, I have yet to see someone contesting this conclusion.

 
As usual, I’d love to be proven wrong.

I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreams

It is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.

What Greg describes there is exactly what I mean with Eventually Linearizable: while things are happening it takes a while for the system to sync up and agree on a replay order. Once that is set, everything is deterministic.

Yes, the problem is that a PersistentView is querying live data, and if that "replay" order is supposed to be the same as a later historical query the data must be stored in total order, or some kind of buffering/sorting best effort must be used.

Yes.

 

The alternative would be to strictly obey causal order by tracking who sent what in response to which message. Causality only defines a partial order, but that should by definition be enough because those event pairs which are unordered also do not care about each other (i.e. the global ordering might be different during each replay but that is irrelevant since nobody can observe it anyway).

I think this ties in to what is confusing me most about all this. The only consistency boundary in my world is the DDD Aggregate instance, i.e. one PersistentActor instance. What has been requested is something that requires consistency (ordering) across Aggregate instances.

Why not model the topic as a separate PersistentActor? Problems with that has been raised, such as how to reliably deliver the events from the Aggregate PersistentActor to the topic PersistentActor. Is that impossible to solve?

Again: we need to keep Topics and Queries separated. What you describe is a Topic, and that works fine. What people asked for are Queries, and they are difficult. The use-case that was initially presented was not about consistency between different PersistentActors, it was only about the capability to deterministically replay all events in the system, which includes the ability to start at a given point. I still have not seen a proposal for how to achieve that without someone having to actually store different cursors for the distributed datastore partitions—unless the store is not partitioned and therefore not scalable.

Regards,

Roland

Gary Malouf

unread,
Aug 19, 2014, 8:57:15 AM8/19/14
to akka...@googlegroups.com

For CQRS specifically, a lot of what people call scalability is in it's ability to easily model multiple read views to make queries very fast off the same event data.

In the cases where a true global ordering is truly necessary, one often does not need to handle hundreds of thousands of writes per second.  I think the ideal is to have the global ordering property for events by default, and have to disable that if you feel a need to do more writes per second than a single writer can handle.

Once the global ordering property is enforced, solving many of the publisher ordering issues (and supporting sagas) becomes significantly easier to achieve. 

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Roland Kuhn

unread,
Aug 19, 2014, 9:14:17 AM8/19/14
to akka-user
18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:

Hi Roland (and everyone),


Welcome back Roland - I hope you had a great vacation.


Thank you for your post.  



Here’s my response summary:


I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,



This is a convoluted way of saying that Events must be self-contained, right? In that case: check!


(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since an actor can just ignore them on replay, 



Yes, the actor chooses which effect an Event has on its state. Check!


(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and



This is the unclear point: who defines the query and when? What are the consistency guarantees for the generated event stream?


(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.



This is PersistentView, so “check!” (As argued previously “reliably” translates to “persistent”.)


I believe each of these is readily achievable with Akka but:


(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).



No, again we need to strictly keep Topics and Queries separate, they are very different features. Topics are defined up-front and explicitly written to, Queries are constructed later based on the existing event log contents. Marking events within the store with timestamps of some kind might help achieving a pseudo-deterministic behavior, but it is by no means a guarantee. Causal ordering is out of scope, and it also does not help in achieving the desired ability to replay Queries from some given point in the past.


(iii) with (i) would enable the read side (if the store supports it) to read all events from a particular actor(s), of particular event types, to read events from a particular type(s) of actors, and to read all events.  It would also need to allow the read side to read from where it last finished reading, from now, and from the start again.  (iv) is necessary for projections.  



If you are interested, here’s my detailed explanation:


I think some of the confusion surrounding these issues is caused by the fact that we seem to be discussing and, if I may suggest, Akka appears to be trying to implement three quite different (but also somewhat related) pieces of functionality within this domain.


Just anecdotally: the goal of Akka Persistence is to achieve at-least-once processing semantics for persistent actors. We’ll see how far a stretch it is to incorporate all that is needed for effective CQRS/ES.

 These are:


A. Actor Persistence


The ability to persist actor state changes incrementally (or wholly) and reconstruct that state at a later time, which we know as event sourcing.  I think Akka provides a great distributed and scalable mechanism for doing this with the current akka.persistence.


B. Publish/Subscribe to Persistent Queues/Topics


This functionality would allow actors to write data/events/messages to one (or more) topics and to subscribe to receive similar from one or more topics.  These differ from normal publish/subscribe queues in that they are persistent and the consumer can reread from the topic.


This is what I think of as the LogProducer and LogConsumer, of which PersistentActor and PersistentView can be thought of as specialisations, i.e. a single topic for each actor.  The current and popular example of a store for this sort of functionality, as you know, is Kafka. 



Agreed; this moved into focus thanks to your initiating this discussion!


C. CQRS with Event Sourcing


And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores.  So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.


Please consider this diagram overviewing CQRS with Event Sourcing:


<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>


adapted from 


<http://www.gridshore.nl/wp-content/uploads/cqrs_architecture.jpg>


As I understand it, CQRS separates the write model and store from one or *more* read models and stores, with each model and store being optimised for their particular role.  CQRS says nothing specific about the types of store (e.g. SQL or NOSQL, event sourced or not) and how consistency is achieved.


As you know, when using event sourcing the changes to the write model entities (e.g. Aggregate Roots) are stored as events and the write model is reconstructed by replaying those events.  This is (A) above and what akka.persistence has achieved very well in a distributed and scalable way.  


This is the dashed area labelled [1] in the diagram.


Further, CQRS uses commands to initiate changes to the write model and signals theses changes with events (whether the events are used for event sourcing or not).  These events are what allows sagas and other systems to track changes and respond to changes in the write model.  


This is the dashed area labelled [2] in the diagram.


For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done.  The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.



Why would the Sage subscribe to events instead of talking with the domain entities directly? My understanding is that the Saga sits logically on the top left side of the first diagram, acting on behalf of and just like the user (and like the user it should have a memory and be persistent).


This event notification system is conceptually independent of how the read store(s) will eventually become consistent (i.e. there are other means of achieving this consistency without events).  However, it just so happens that these events are also a great way to notify the read store(s) of changes.


So conceptually, the read side receives all the events from the write side independent of the write side event store. I believe we should logically think of this as one big stream of events since the write side doesn’t (and cannot) know what the read store(s) needs from these events in advance.  



Potentially, yes. This is also why I think that this connection is a back-end concern beneath the Akka APIs, it is the Journal’s backing store talking to some other stores.


This is the dashed area labelled [3] in the diagram.


Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea.  But I don’t believe this is enough for CQRS.  One event store cannot provide all the required read models.



This is probably where all (our) misunderstanding originates: PersistentView is a very particular thing and it turns out that it does not actually match up with the Q in CQRS. Perhaps we should indeed just remove it and add a facility which lets Actors query the Journal instead (in case you want to roll your own read model adapter).


Because the read side *must* support a number of different stores (not just the one used for actor persistence and to distribute the events to the read side) it is also necessary for there to be some reliable way to get event information into the other read store(s).



This is reliable by way of being persistent in the first place. Sending things around the Journal is therefore breaking the model.


For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time.  As you mentioned, this needs to be idempotent to handle at least once delivery. 



I hope that Akka can really fully support CQRS with Event Sourcing to become the premiere platform for its use in highly scalable, reliable and efficient systems.  I also hope this explanation can help that happen sooner rather than later since I think it is a great opportunity for Akka.  



I fully agree, and this is indeed a very important discussion.

Roland Kuhn

unread,
Aug 19, 2014, 9:17:03 AM8/19/14
to akka-user
19 aug 2014 kl. 07:18 skrev Greg Young <gregor...@gmail.com>:

everything you list here is available today via akka.persistence + event store adapter & a durable subscription (akka event store client) on the read model side.

This sounds like the best candidate for a way forward at this point. “Durable subscription” is a tough nut to crack, though, for a distributed storage system, especially if the underlying Query is supposed to be created on the live system instead of up-front.

Regards,

Roland

Roland Kuhn

unread,
Aug 19, 2014, 9:20:19 AM8/19/14
to akka-user
19 aug 2014 kl. 14:57 skrev Gary Malouf <malou...@gmail.com>:

For CQRS specifically, a lot of what people call scalability is in it's ability to easily model multiple read views to make queries very fast off the same event data.

In the cases where a true global ordering is truly necessary, one often does not need to handle hundreds of thousands of writes per second.  I think the ideal is to have the global ordering property for events by default, and have to disable that if you feel a need to do more writes per second than a single writer can handle.


Unfortunately it is not only the number of writes per second, the sheer data volume can drive the need for a distributed, partitioned storage mechanism. There is only so much you can fit within a single machine and once you go beyond that you quickly run into CAP (if you want your guarantees to hold 100% at all times). The way forward then necessitates that you must compromise on something, either Availability or Determinism (in this case).

Regards,

Roland

√iktor Ҡlang

unread,
Aug 19, 2014, 9:24:10 AM8/19/14
to Akka User List
The decision if scale is needed cannot be implicit, as then you are luring people into the non-scalable world and when they find out then it is too late.
Cheers,

Gary Malouf

unread,
Aug 19, 2014, 9:43:47 AM8/19/14
to akka...@googlegroups.com

So how does one handle combining events from different streams- a global sequence number is the most straightforward.

Also, not everything needs to scale on the write side to that degree.

Greg Young

unread,
Aug 19, 2014, 10:27:33 AM8/19/14
to akka...@googlegroups.com
I am not responding to this one post just a reply towards the end and will discuss a few posts from earlier.

To start I have to agree with some of the posters that premature scaling can cause many issues. This actually reminds me of the CQRS journey which people mentioned earlier. One of the main criticisms of the CQRS Journey is that it prematurely took scaling constraints which causes the code to be much much more complex than it needs to be. This was partially due to it being a sample app of something larger and partially due to the p&p team also showing azure at the same time. Because they wanted to distribute and show Azure at the same time the team took cloud constraints as a given. This caused for instance every handler in the system to need to be idempotent. While seemingly a small constraint this actually adds a significant amount of complexity to the system.

The same problem exists in what is being discussed today. For 95+% of systems it is totally reasonable that when I write a projection I expect my events to have assured ordering. As Vaughn mentioned a few hundred events/second is the vast majority of systems. Systems like these can be completely linearized and ordering assurances are not an issue. This removes a LOT of complexity in projections code as you don't have to handle hundreds to thousands of edge cases in your read models where you get events out of order. Saying that ordering assurances are not needed and everyone should use casual consistency is really saying "we don't care about the bottom 95% of users".




RKuhn had mentioned doing joins. You are correct in this is how we do it now. We offer historically perfect joins but in live there is no way to do a live perfect join via queries. We do however support another mechanism for this that will assure that your live join will always match your historical. We allow you to precalculate and save the results of the join. This produces a stream full of stream links which can then be replayed as many times (perfectly) as you want.


There was some discussion above about using precalculated topics to handle projections. I believe the terminology was called tags. The general idea if I can repeat it is to write an event FooOccurred and to include upon it some tags (foo, bar, baz) which would map it to topics that could then be replayed as a whole. This on the outset seems like a good idea but will not work well in production. The place where it will run into a problem is that I cannot know when writing the events all mappings that any future projections may wish to have. Tomorrow my business could ask me for a report that looks at a completely new way of partitioning the events and I will be unable to do it.


As I mentioned previously in a quick comment. What is being asked for today is actually already supported with akka,persistence providing you are using event store as your backend (for those interested today is the release of the final RC of 3.0 which has all of the support for the akka,perisistence client (binaries are for win/linux/max)). Basically what you would do is run akka.persistence on your write side but *not* use it for supporting your read models. Instead when dealing with your read models you would use a catchupsubscription for what you are interested in. I do not see anything inherently wrong with this way of doing things and it begs the question of whether this is actually a more appropriate way to deal with eventsourced systems using akka,.persistence. eg use native storage directly if it supports it.

Cheers,

Greg
...

Martin Krasser

unread,
Aug 19, 2014, 11:08:14 AM8/19/14
to akka...@googlegroups.com

On 19.08.14 16:27, Greg Young wrote:
I am not responding to this one post just a reply towards the end and will discuss a few posts from earlier.

To start I have to agree with some of the posters that premature scaling can cause many issues. This actually reminds me of the CQRS journey which people mentioned earlier. One of the main criticisms of the CQRS Journey is that it prematurely took scaling constraints which causes the code to be much much more complex than it needs to be. This was partially due to it being a sample app of something larger and partially due to the p&p team also showing azure at the same time. Because they wanted to distribute and show Azure at the same time the team took cloud constraints as a given. This caused for instance every handler in the system to need to be idempotent. While seemingly a small constraint this actually adds a significant amount of complexity to the system.

The same problem exists in what is being discussed today. For 95+% of systems it is totally reasonable that when I write a projection I expect my events to have assured ordering. As Vaughn mentioned a few hundred events/second is the vast majority of systems. Systems like these can be completely linearized and ordering assurances are not an issue. This removes a LOT of complexity in projections code as you don't have to handle hundreds to thousands of edge cases in your read models where you get events out of order. Saying that ordering assurances are not needed and everyone should use casual consistency is really saying "we don't care about the bottom 95% of users".



Can you please enlighten me what you mean by "casual" consistency. Past discussions were always about causal consistency. If implemented, it would add additional ordering to events in akka-persistence compared to the ordering that is given right now. Today, only the ordering of events with the same persistenceId is defined. Events with different persistenceId are currently considered concurrent by akka-persistence. Causal consistency would additionally introduce ordering of events across persistenceIds if they are causally related (i.e. have a happens-before relationship). Those events that don't have such a relationship are truely concurrent. Causal consistency is not trivial to implement but has the advantage that it doesn't prevent scalability (see also this paper, for example). It is weaker than sequential consistency, though.

Gary Malouf

unread,
Aug 19, 2014, 11:28:16 AM8/19/14
to akka...@googlegroups.com
Causal consistency is a very cool concept for systems at large scale, but as you said Martin there are pain points in implementing it and (I imagine) concerns that end-users need to deal with.  

Greg - your thoughts on using Akka Persistence just for the write-side is exactly where my team is heading.  We were originally planning to combine all of our aggregate roots under a single processor to get Akka Persistence to behave with Cassandra (on Local Quorum), but it sounds like EventStore would keep us from having to do that.


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Ashley Aitken

unread,
Aug 19, 2014, 12:59:57 PM8/19/14
to akka...@googlegroups.com


On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:

18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:

I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,

This is a convoluted way of saying that Events must be self-contained, right? In that case: check!

No, I don't think so.  As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc.  I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side.  As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.  

(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and

This is the unclear point: who defines the query and when? What are the consistency guarantees for the generated event stream?

I suggest the developers of the read side specify the queries directly to the event store but this may be after the events have initially been persisted.  The event store produces the query stream (if it can) and a PersistentView can be setup to read from that named query.  With regards to consistency guarantees - my understanding is that these streams are used to eventually guarantee that the query model will be consistent with the write model, i.e. all the events will get across.  With regards to ordering I think the event store does the best it can to provide consistent ordering, e.g. total ordering if there was no distribution and causal ordering, where possible, if there was ordering.  The developer would need to understand the limitations of how the query store is configured and queried.

  

(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.

This is PersistentView, so “check!” (As argued previously “reliably” translates to “persistent”.)

As I asked in another thread (I think) I am not sure how PersistentView can do this when PersistentActor is the one that can mixin AtLeastOnceDelivery?

I think we need a PeristentView that can guarantee AtLeastOnceDelivery to an actor representing a query store.  This would seem to require a PersistentViewActor ;-) that can read from a persistent query and also persist its state to provide guaranteed delivery.

My lack of knowledge of Scala and Akka may be showing here.

I believe each of these is readily achievable with Akka but:


(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).

No, again we need to strictly keep Topics and Queries separate, they are very different features. Topics are defined up-front and explicitly written to, Queries are constructed later based on the existing event log contents. Marking events within the store with timestamps of some kind might help achieving a pseudo-deterministic behavior, but it is by no means a guarantee. Causal ordering is out of scope, and it also does not help in achieving the desired ability to replay Queries from some given point in the past.

I think we do agree somewhere in there but I don't think as was suggested (by you earlier?) that creating topics up-front whether a fixed set or arbitrary tags will work.  I feel in what way the store supports the queries (and how much it can) is up to the store (e.g. creating separate topics or synthetic topics), so I would argue against using topics for CQRS.  As I mention below for Pub/Sub to Persistent topics it would be great, but not for CQRS.

C. CQRS with Event Sourcing


And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores.  So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.


Please consider this diagram overviewing CQRS with Event Sourcing:


<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>


For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done.  The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.

Why would the Sage subscribe to events instead of talking with the domain entities directly?

I believe it is for loose coupling and to make the process asynchronous (and event-driven).  When an AR processes a command it cannot know all the other ARs or sagas wanting to know when the command has been performed.   Also in an actor-based system to be reliable(?) I would assume sagas need to read events from a persistent store (or receive messages with at-least-once-delivery guarantee).

Please check out the short section on Sagas from the CQRS Journey book:


Particularly Figure 2.

I believe the black filled arrows are events and the white hollow arrows are commands.  Commands I believe are well suited to actor messages but events I believe are best suited to persistent publish/subscribe-like communication (i.e. the journal).  To be honest though, I note that the text talks about event messages, although it also talks about the event messages using an event bus, so I am a little unsure.  

 
My understanding is that the Saga sits logically on the top left side of the first diagram, acting on behalf of and just like the user (and like the user it should have a memory and be persistent).

My understanding is that sagas sit where the Services are in the diagram I provided (near [2]), i.e. they issue commands (not labelled as such in the diagram unfortunately but what else could they be?) and listen for events on the write side (as labelled).


Someone please correct me if I am wrong.

This is the dashed area labelled [3] in the diagram.


Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea.  But I don’t believe this is enough for CQRS.  One event store cannot provide all the required read models.


This is probably where all (our) misunderstanding originates: PersistentView is a very particular thing and it turns out that it does not actually match up with the Q in CQRS. Perhaps we should indeed just remove it and add a facility which lets Actors query the Journal instead (in case you want to roll your own read model adapter).

PersistentViews tied to one PersistentActor we know are too limited.  However, I think PersistentViews tied to named query streams (specified to the journal) would be useful (if they could guaranteed at least once delivery to an actor representing a query store).

Thank you for your patience reading through all of this text.

Cheers,
Ashley.


Greg Young

unread,
Aug 19, 2014, 3:57:18 PM8/19/14
to akka...@googlegroups.com
please forgive the typo.

It still adds a ton of complexity that is unnecessary for the vast number of systems. We support it in event store but most don't use it. 
...

Martin Krasser

unread,
Aug 20, 2014, 12:37:26 AM8/20/14
to akka...@googlegroups.com

On 19.08.14 21:57, Greg Young wrote:
please forgive the typo.

It still adds a ton of complexity that is unnecessary for the vast number of systems.

I don't see that. The complexity should only be on plugin providers, not application code (see also the research paper I linked in a previous post). It is the provider's responsibility (in collaboration with PersistentActor and PersistentView) to ensure causal ordering in event streams. Properly implemented, there's no additional complexity for applications. They can just rely on the stricter ordering guarantees.


We support it in event store but most don't use it.

Can you please share any pointers that describe how causal consistency is supported/implemented by event store?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Roland Kuhn

unread,
Aug 20, 2014, 3:37:00 AM8/20/14
to akka-user

19 aug 2014 kl. 16:27 skrev Greg Young <gregor...@gmail.com>:

I am not responding to this one post just a reply towards the end and will discuss a few posts from earlier.

To start I have to agree with some of the posters that premature scaling can cause many issues. This actually reminds me of the CQRS journey which people mentioned earlier. One of the main criticisms of the CQRS Journey is that it prematurely took scaling constraints which causes the code to be much much more complex than it needs to be. This was partially due to it being a sample app of something larger and partially due to the p&p team also showing azure at the same time. Because they wanted to distribute and show Azure at the same time the team took cloud constraints as a given. This caused for instance every handler in the system to need to be idempotent. While seemingly a small constraint this actually adds a significant amount of complexity to the system.

The same problem exists in what is being discussed today. For 95+% of systems it is totally reasonable that when I write a projection I expect my events to have assured ordering. As Vaughn mentioned a few hundred events/second is the vast majority of systems. Systems like these can be completely linearized and ordering assurances are not an issue. This removes a LOT of complexity in projections code as you don't have to handle hundreds to thousands of edge cases in your read models where you get events out of order. Saying that ordering assurances are not needed and everyone should use casual consistency is really saying "we don't care about the bottom 95% of users".

As noted earlier we are in agreement on this: providing projections (which I also called Queries in this thread) without strict ordering would be meaningless because reliable consumption would only be possible from start to finish. The ability to remember a stream position and restart replay from there implies linearization. We also all agree that this feature cannot be supported by a back-end store that is scalable beyond a single partition (i.e. when multiple distributed nodes are concurrently written to). And we agree that this restriction is tolerable in a large number of relevant use-cases.


RKuhn had mentioned doing joins. You are correct in this is how we do it now. We offer historically perfect joins but in live there is no way to do a live perfect join via queries. We do however support another mechanism for this that will assure that your live join will always match your historical. We allow you to precalculate and save the results of the join. This produces a stream full of stream links which can then be replayed as many times (perfectly) as you want.


There was some discussion above about using precalculated topics to handle projections. I believe the terminology was called tags. The general idea if I can repeat it is to write an event FooOccurred and to include upon it some tags (foo, bar, baz) which would map it to topics that could then be replayed as a whole. This on the outset seems like a good idea but will not work well in production. The place where it will run into a problem is that I cannot know when writing the events all mappings that any future projections may wish to have. Tomorrow my business could ask me for a report that looks at a completely new way of partitioning the events and I will be unable to do it.

This is a crucial point which implies that Akka Persistence cannot generically provide meaningful projections (or Queries) without relying on a linearizable back-end store.

As I mentioned previously in a quick comment. What is being asked for today is actually already supported with akka,persistence providing you are using event store as your backend (for those interested today is the release of the final RC of 3.0 which has all of the support for the akka,perisistence client (binaries are for win/linux/max)). Basically what you would do is run akka.persistence on your write side but *not* use it for supporting your read models. Instead when dealing with your read models you would use a catchupsubscription for what you are interested in. I do not see anything inherently wrong with this way of doing things and it begs the question of whether this is actually a more appropriate way to deal with eventsourced systems using akka,.persistence. eg use native storage directly if it supports it.

Taking together the conclusions so far I tend to agree with this assessment. Akka Persistence can provide designated event streams with proper ordering (per persistenceId or Topic) while projections or Queries depend on the underlying storage technology.

A potential compromise would be to offer generic but inefficient Queries for those Journals that can provide everything in-order; otherwise we would need to standardize on a query language and that prospect makes me shiver …

Regards,

Roland

Roland Kuhn

unread,
Aug 20, 2014, 4:16:27 AM8/20/14
to akka-user
19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:

On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:

18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:

I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,

This is a convoluted way of saying that Events must be self-contained, right? In that case: check!

No, I don't think so.  As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc.  I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side.  As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.  

The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.

On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.


(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and

This is the unclear point: who defines the query and when? What are the consistency guarantees for the generated event stream?

I suggest the developers of the read side specify the queries directly to the event store but this may be after the events have initially been persisted.  The event store produces the query stream (if it can) and a PersistentView can be setup to read from that named query.  With regards to consistency guarantees - my understanding is that these streams are used to eventually guarantee that the query model will be consistent with the write model, i.e. all the events will get across.  With regards to ordering I think the event store does the best it can to provide consistent ordering, e.g. total ordering if there was no distribution and causal ordering, where possible, if there was ordering.  The developer would need to understand the limitations of how the query store is configured and queried.

As I answered to Greg already, I think that this should not be a core concern of Akka Persistence; as you note it relies on features provided by the underlying event store, and those features are not necessary to achieve the goal of making actors persistent.


  

(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.

This is PersistentView, so “check!” (As argued previously “reliably” translates to “persistent”.)

As I asked in another thread (I think) I am not sure how PersistentView can do this when PersistentActor is the one that can mixin AtLeastOnceDelivery?

I think we need a PeristentView that can guarantee AtLeastOnceDelivery to an actor representing a query store.  This would seem to require a PersistentViewActor ;-) that can read from a persistent query and also persist its state to provide guaranteed delivery.

My lack of knowledge of Scala and Akka may be showing here.

My current impression is that PersistentView needs to be re-thought: instead of tying it to a persistenceId like we do now we should just provide an API for subscribing to named topics in the Journal—be that persistenceIds of some PersistentActors or synthetic ones. One Actor should be able to subscribe to any number of them, but the onus will be on it to keep track of the positions up to which it has consumed from all of them.

This does not preclude the Journal from providing a synthetic topic with proper linearization for all events in it (or whatever you want to specifically configure on the storage back-end, outside of  the Journal SPI). And this also does not invalidate my point that normally the consumption of Queries should be done directly from the backing store, making full use of its unique feature set.


I believe each of these is readily achievable with Akka but:


(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).

No, again we need to strictly keep Topics and Queries separate, they are very different features. Topics are defined up-front and explicitly written to, Queries are constructed later based on the existing event log contents. Marking events within the store with timestamps of some kind might help achieving a pseudo-deterministic behavior, but it is by no means a guarantee. Causal ordering is out of scope, and it also does not help in achieving the desired ability to replay Queries from some given point in the past.

I think we do agree somewhere in there but I don't think as was suggested (by you earlier?) that creating topics up-front whether a fixed set or arbitrary tags will work.  I feel in what way the store supports the queries (and how much it can) is up to the store (e.g. creating separate topics or synthetic topics), so I would argue against using topics for CQRS.  As I mention below for Pub/Sub to Persistent topics it would be great, but not for CQRS.

Yes, indeed, topics are interesting but not strictly related to CQRS.

C. CQRS with Event Sourcing


And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores.  So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.


Please consider this diagram overviewing CQRS with Event Sourcing:


<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>


For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done.  The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.

Why would the Sage subscribe to events instead of talking with the domain entities directly?

I believe it is for loose coupling and to make the process asynchronous (and event-driven).

Actor messaging already has those qualities, no event store necessary.

 When an AR processes a command it cannot know all the other ARs or sagas wanting to know when the command has been performed.

It will be the Saga that has sent the Command, so yes, it will know whom to reply to (unless I misunderstand what a Saga is). Another consideration is that modeling the replies by the Saga subscribing to the events produced by various aggregates (in the example linked to below) involves a large overhead as compared to directly receiving just those events pertaining to its own commands—this is best done within the target aggregate by simple sending them with the tell operator after having received the confirmation from the persistent store.

  Also in an actor-based system to be reliable(?) I would assume sagas need to read events from a persistent store (or receive messages with at-least-once-delivery guarantee).

The Saga persist its own progress and retries commands that are not confirmed within a timeout (or falls back to different strategies or fails the overall request). This makes it reliable in exactly the right way without unnecessary overhead.


Please check out the short section on Sagas from the CQRS Journey book:


Particularly Figure 2.

I believe the black filled arrows are events and the white hollow arrows are commands.  Commands I believe are well suited to actor messages but events I believe are best suited to persistent publish/subscribe-like communication (i.e. the journal).  To be honest though, I note that the text talks about event messages, although it also talks about the event messages using an event bus, so I am a little unsure.  

 
My understanding is that the Saga sits logically on the top left side of the first diagram, acting on behalf of and just like the user (and like the user it should have a memory and be persistent).

My understanding is that sagas sit where the Services are in the diagram I provided (near [2]), i.e. they issue commands (not labelled as such in the diagram unfortunately but what else could they be?) and listen for events on the write side (as labelled).


Someone please correct me if I am wrong.

I don’t know about being wrong, I’m just making up my mind based on what I find useful, and if my conclusions are wrong I trust people will tell me so eventually ;-)


This is the dashed area labelled [3] in the diagram.


Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea.  But I don’t believe this is enough for CQRS.  One event store cannot provide all the required read models.


This is probably where all (our) misunderstanding originates: PersistentView is a very particular thing and it turns out that it does not actually match up with the Q in CQRS. Perhaps we should indeed just remove it and add a facility which lets Actors query the Journal instead (in case you want to roll your own read model adapter).

PersistentViews tied to one PersistentActor we know are too limited.  However, I think PersistentViews tied to named query streams (specified to the journal) would be useful (if they could guaranteed at least once delivery to an actor representing a query store).

If we change the View feature in the way I suggested above then we have all the pieces already, AtLeastOnceDelivery or persist() could be used to reliably write projections into another store where needed.


Thank you for your patience reading through all of this text.

Thank you for this discussion, it is very useful and enjoyable!

Regards,

Roland


Cheers,
Ashley.



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Martin Krasser

unread,
Aug 20, 2014, 4:43:34 AM8/20/14
to akka...@googlegroups.com

On 20.08.14 10:16, Roland Kuhn wrote:

19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:

On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:

18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:

I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,

This is a convoluted way of saying that Events must be self-contained, right? In that case: check!

No, I don't think so.  As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc.  I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side.  As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.  

The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.

On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.

akka.dispatch.Envelope could carry additional information, so that user-collaboration is not needed

Roland Kuhn

unread,
Aug 20, 2014, 7:33:33 AM8/20/14
to akka-user
20 aug 2014 kl. 10:43 skrev Martin Krasser <kras...@googlemail.com>:

On 20.08.14 10:16, Roland Kuhn wrote:

19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:

On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:

18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:

I believe Akka needs to allow actors to:


(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,

This is a convoluted way of saying that Events must be self-contained, right? In that case: check!

No, I don't think so.  As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc.  I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side.  As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.  

The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.

On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.

akka.dispatch.Envelope could carry additional information, so that user-collaboration is not needed

Well, actually Envelope is going to go away in Akka Gålbma (a.k.a. Akka 3).

Martin Krasser

unread,
Aug 20, 2014, 7:44:05 AM8/20/14
to akka...@googlegroups.com


Am 20.08.2014 13:33 schrieb "Roland Kuhn" <goo...@rkuhn.info>:
>
>
> 20 aug 2014 kl. 10:43 skrev Martin Krasser <kras...@googlemail.com>:
>
>> On 20.08.14 10:16, Roland Kuhn wrote:
>>>
>>>
>>> 19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:
>>>
>>>> On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:
>>>>>
>>>>>
>>>>> 18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:
>>>>>
>>>>>> I believe Akka needs to allow actors to:
>>>>>>
>>>>>>
>>>>>> (i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
>>>>>
>>>>> This is a convoluted way of saying that Events must be self-contained, right? In that case: check!
>>>>
>>>>
>>>> No, I don't think so.  As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc.  I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side.  As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.  
>>>
>>>
>>> The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.
>>>
>>> On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.
>>
>>
>> akka.dispatch.Envelope could carry additional information, so that user-collaboration is not needed
>
>
> Well, actually Envelope is going to go away in Akka Gålbma (a.k.a. Akka 3).

My point was that causal consistency can be achieved without forcing users to do extra work, as tracing dependencies can be done completely within Akka, and there are several possible ways to implement that.

Greg Young

unread,
Aug 20, 2014, 10:16:22 AM8/20/14
to akka...@googlegroups.com
Please stop using the terminology of "saga" and replace usage with "process manager" what people (largely influenced by nservicebus call a saga is actually a process manager and a saga is a different pattern). Its bad enough the .net community does this the last thing we need is for the akka community to start doing the same :)


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Roland Kuhn

unread,
Aug 20, 2014, 10:31:45 AM8/20/14
to akka-user
20 aug 2014 kl. 16:16 skrev Greg Young <gregor...@gmail.com>:

Please stop using the terminology of "saga" and replace usage with "process manager" what people (largely influenced by nservicebus call a saga is actually a process manager and a saga is a different pattern). Its bad enough the .net community does this the last thing we need is for the akka community to start doing the same :)

Sure, but please do educate us as to the right use of these two words so we persist the correct definitions in the list archives. My main question is: what is that other pattern that shall be called a Saga?

Regards,

Roland

Greg Young

unread,
Aug 20, 2014, 10:39:55 AM8/20/14
to akka...@googlegroups.com
I held the same issue with ms pnp

Clarifying the terminology

The term saga is commonly used in discussions of CQRS to refer to a piece of code that coordinates and routes messages between bounded contexts and aggregates. However, for the purposes of this guidance we prefer to use the term process manager to refer to this type of code artifact. There are two reasons for this:

There is a well-known, pre-existing definition of the term saga that has a different meaning from the one generally understood in relation to CQRS. The term process manager is a better description of the role performed by this type of code artifact.

Although the term saga is often used in the context of the CQRS pattern, it has a pre-existing definition. We have chosen to use the term process manager in this guidance to avoid confusion with this pre-existing definition.

The term saga, in relation to distributed systems, was originally defined in the paper "Sagas" by Hector Garcia-Molina and Kenneth Salem. This paper proposes a mechanism that it calls a saga as an alternative to using a distributed transaction for managing a long-running business process. The paper recognizes that business processes are often comprised of multiple steps, each of which involves a transaction, and that overall consistency can be achieved by grouping these individual transactions into a distributed transaction. However, in long-running business processes, using distributed transactions can impact on the performance and concurrency of the system because of the locks that must be held for the duration of the distributed transaction.

Greg Young

unread,
Aug 20, 2014, 11:01:41 AM8/20/14
to akka...@googlegroups.com

Ashley Aitken

unread,
Aug 20, 2014, 2:10:34 PM8/20/14
to akka...@googlegroups.com

Whilst we are talking about s... process managers I would like to include this simple way of understanding them I found on the web: "Process Managers produce commands and consume events, whereas Aggregate Roots consume commands and produce events."  The truth is a bit more complicated I believe in that Process Managers can also consume commands (e.g. to stop the process).  

Further, whilst I would like to accept Roland's view that both commands and events can be communicated by sending messages (since, as he suggests, it would make things a lot simpler and lighter on the write side), I am concerned that there are use-cases for process managers that involve them listening for events from ARs they have not sent a command message to.  Can anyone confirm/deny?

Thanks,
Ashley.


...

Gary Malouf

unread,
Aug 20, 2014, 5:46:53 PM8/20/14
to akka...@googlegroups.com
Greg - if one uses the current Akka Persistence with eventstore as the backend, is it possible/what are the challenges in getting safe 'process managers' to work as one would expect?  I would think you'd want event store feeding a different Akka Persistence processor.

Greg Young

unread,
Aug 20, 2014, 7:10:29 PM8/20/14
to akka...@googlegroups.com
You can relatively easily support process managers on top (event store assures idempotency)

Roland Kuhn

unread,
Aug 21, 2014, 3:56:24 AM8/21/14
to akka-user
Thanks for the pointer, indeed we do not wish to be associated with distributed transactions in any form so we’ll say Process Manager henceforth :-) (even though the goal for the user is the same [albeit on a different substrate] and Saga is such a nice little word—a shame)

Actually, is it really excluded that we can reappropriate the term? Process Manager is quite a mouthful. Or can we find a better one?

Regards,

Roland

Roland Kuhn

unread,
Aug 21, 2014, 4:01:26 AM8/21/14
to akka-user
20 aug 2014 kl. 20:10 skrev Ashley Aitken <amai...@gmail.com>:


Whilst we are talking about s... process managers I would like to include this simple way of understanding them I found on the web: "Process Managers produce commands and consume events, whereas Aggregate Roots consume commands and produce events."  The truth is a bit more complicated I believe in that Process Managers can also consume commands (e.g. to stop the process).  

Further, whilst I would like to accept Roland's view that both commands and events can be communicated by sending messages (since, as he suggests, it would make things a lot simpler and lighter on the write side), I am concerned that there are use-cases for process managers that involve them listening for events from ARs they have not sent a command message to.  Can anyone confirm/deny?

My initial impulse would be to postulate an isomorphism between Actor and AR and then say that the Actor Model works just fine with only direct message sends, so the same must hold for ARs and their commands/events. In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge. Inputs anyone?

Regards,

Roland

delasoul

unread,
Aug 21, 2014, 4:38:19 AM8/21/14
to akka...@googlegroups.com
Hello Roland,

we call them not ProcessManager or Saga, but just Process, e.g.: DeviceRegistrationProcess.
Typically they are implemented with FSM or Actor.become, so direct message sends(and replying to the sender) was everything we needed
until now, but maybe there are more complicated usecases then ours...

Regards,

michael

Thanks,
Ashley.


Roland Kuhn

unread,
Aug 21, 2014, 5:06:42 AM8/21/14
to akka-user
Hi Michael,

this is a good suggestion, although googling for DDD and Process yields mostly irrelevant hits ;-) Anyway, at least as a naming convention it is a very good proposal, and it also has the right connotations.

Regards,

Roland

Vaughn Vernon

unread,
Aug 21, 2014, 2:04:00 PM8/21/14
to akka...@googlegroups.com
FWIW, Chronicle has almost the same meaning as Saga, and I am pretty sure that Chronicle is not used in any way that is associated with processes. That said, I am actually not recommending that the name Process Manager be substituted by Chronicle. I am just throwing the name out there.

I am glad to see these discussions leading somewhere. Is there any place where all this has been or could be summarized? You know, chronicled? (Oops ;)
...
Message has been deleted

Ashley Aitken

unread,
Aug 22, 2014, 5:41:02 AM8/22/14
to akka...@googlegroups.com

Hi Roland (and everyone),

Welcome back Roland - I hope you had a great vacation.

Thank you for your post.  


Here’s my response summary:

I believe Akka needs to allow actors to:

(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,

(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since an actor can just ignore them on replay, 

(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and

(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.

I believe each of these is readily achievable with Akka but:

(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).

(iii) with (i) would enable the read side (if the store supports it) to read all events from a particular actor(s), of particular event types, to read events from a particular type(s) of actors, and to read all events.  It would also need to allow the read side to read from where it last finished reading, from now, and from the start again.  (iv) is necessary for projections.  


If you are interested, here’s my detailed explanation:

I think some of the confusion surrounding these issues is caused by the fact that we seem to be discussing and, if I may suggest, Akka appears to be trying to implement three quite different (but also somewhat related) pieces of functionality within this domain.  These are:

A. Actor Persistence

The ability to persist actor state changes incrementally (or wholly) and reconstruct that state at a later time, which we know as event sourcing.  I think Akka provides a great distributed and scalable mechanism for doing this with the current akka.persistence.

B. Publish/Subscribe to Persistent Queues/Topics

This functionality would allow actors to write data/events/messages to one (or more) topics and to subscribe to receive similar from one or more topics.  These differ from normal publish/subscribe queues in that they are persistent and the consumer can reread from the topic.

This is what I think of as the LogProducer and LogConsumer, of which PersistentActor and PersistentView can be thought of as specialisations, i.e. a single topic for each actor.  The current and popular example of a store for this sort of functionality, as you know, is Kafka. 

C. CQRS with Event Sourcing

And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores.  So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.

Please consider this diagram overviewing CQRS with Event Sourcing:


adapted from 


As I understand it, CQRS separates the write model and store from one or *more* read models and stores, with each model and store being optimised for their particular role.  CQRS says nothing specific about the types of store (e.g. SQL or NOSQL, event sourced or not) and how consistency is achieved.

As you know, when using event sourcing the changes to the write model entities (e.g. Aggregate Roots) are stored as events and the write model is reconstructed by replaying those events.  This is (A) above and what akka.persistence has achieved very well in a distributed and scalable way.  

This is the dashed area labelled [1] in the diagram.

Further, CQRS uses commands to initiate changes to the write model and signals theses changes with events (whether the events are used for event sourcing or not).  These events are what allows sagas and other systems to track changes and respond to changes in the write model.  

This is the dashed area labelled [2] in the diagram.

For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done.  The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.

This event notification system is conceptually independent of how the read store(s) will eventually become consistent (i.e. there are other means of achieving this consistency without events).  However, it just so happens that these events are also a great way to notify the read store(s) of changes.

So conceptually, the read side receives all the events from the write side independent of the write side event store. I believe we should logically think of this as one big stream of events since the write side doesn’t (and cannot) know what the read store(s) needs from these events in advance.  

This is the dashed area labelled [3] in the diagram.

Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea.  But I don’t believe this is enough for CQRS.  One event store cannot provide all the required read models.

Because the read side *must* support a number of different stores (not just the one used for actor persistence and to distribute the events to the read side) it is also necessary for there to be some reliable way to get event information into the other read store(s).

For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time.  As you mentioned, this needs to be idempotent to handle at least once delivery. 


I hope that Akka can really fully support CQRS with Event Sourcing to become the premiere platform for its use in highly scalable, reliable and efficient systems.  I also hope this explanation can help that happen sooner rather than later since I think it is a great opportunity for Akka.  

Please anyone, correct me if I am misunderstanding things or have left anything out. 

Cheers,
Ashley.


 
On 16 Aug 2014, at 1:39 am, Roland Kuhn <goo...@rkuhn.info> wrote:

Dear hakkers,

unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!

If I may, here’s what I have understood so far:
  1. In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
  2. Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
  3. Constructing Sagas is hard.

AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.

The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work). The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.

The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).

As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).

When it comes to providing queries in a way that does not have a global ordering, my current opinion is that we should not do this because it would be quite pointless (a.k.a. unusable). A compromise would be to provide eventually linearizable queries based on the premise that the application of events should be idempotent in any case and overlapping replay (i.e. where necessary from the last known-linear point instead of the requested one) must be tolerated. AFAIK this is the topic of ongoing research, though, so I’d place that lower on the priority list.

Does this sound like a fair summary? Please let me know in case I misrepresent or misunderstand something, once we reach consensus on what we need we’ll ticket and solve it, as usual ;-)

Regards,

Roland
12 aug 2014 kl. 18:10 skrev Ashley Aitken <amai...@gmail.com>:


Thanks for your post Vaughn.

On Monday, 11 August 2014 05:57:05 UTC+8, Vaughn Vernon wrote:
None of this stuff is easy to do, and even harder to do right.

I am the first to agree with that.
 
Your post gives away the main problem with getting this to work correctly, because Actor Model and akka-persistence currently supports the first half of A, but not the second half. In other words, to make the interface rich we not only need a new set of abstractions, we also need to overcome the direct messaging nature of actors because it can be limiting in some use cases.  
With the messaging library I am building, currently named Number 9, which includes both persistence and a process manager, this problem is handled as follows. Any actor that sends a message may:

1. send a persistent message to another actor
2. send a persistent message to a topic
3. send a persistent message primarily to another actor, but also to a topic

That is very interesting.  

It seems to me that CQRS commands should be sent as messages (persistent or not) - your (1.) and changes of state (AR or application) should be published as events (to topics or more generally) - your (2.) but I can't see a need for (3.)?

Further, a process manager for a bank account transfer could be implemented with a command to the source account (withdrawForTransfer) that would be acknowledged by a published persistent event (WithdrawnForTransfer).  Similar for deposit into target account.

Pawel Kaczor in his DDD-Leaven-Akka series (Lesson 3) includes projections from aggregated streams of events and a process manager / saga using Akka Persistence by having the ARs persisting their events and also publishing their events.



The only shortcomings (not his fault or a criticism) seem to be: 1) the use of two event infrastructures (one for persistence and one for pub/sub), 2) the limited ability for complex projections (like Greg mentioned and available in Event Store), and 3) lack of persistence for pub/sub events.

The latter makes reconstruction of a read model or construction of a new read model after the events have been published more difficult. 
 
If you have watched any of my presentations on this subject you have heard this before. I am presenting most of this to the DDD Denver meetup this Monday night. The title of the talk is "Building a Reactive Process Manager, Twice". The twice part is because I will demonstrate this working both in Scala with Akka and also in C# with Dotsero:

Thank you I will look out for that (please share the video link if it is recorded and put on the Web).  I have seen (but not watched) some of your videos because I am unsure as to who is leading here and the videos I saw seemed to be from a few years ago.  

I've just got your book so I will get on with reading that (for DDD and CQRS enlightenment).

-- 
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn



-- 
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Ashley Aitken

unread,
Aug 22, 2014, 5:41:14 AM8/22/14
to akka...@googlegroups.com


Hi Roland (and everyone),

Welcome back Roland - I hope you had a great vacation.

Thank you for your post.  


Here’s my response summary:

I believe Akka needs to allow actors to:

(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting events according to what criteria is needed,

(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since a PersistentActor can just ignore them on replay, 

Olger Warnier

unread,
Aug 22, 2014, 5:42:37 AM8/22/14
to akka...@googlegroups.com
Quite a read on a phone during my holidays. Very insightful and adds a lot of considerations around the different topics.

I'd like to mention that the current construct of the persistent view is very helpful in decoupling. I can create new listeners (e.g. PersistentView) based on the events persisted in the store.

Tags were proposed and I would be using those to actually attach the aggregate root type to the event so my persistent views would use that to 'read' the events and create the model. That model is used for the queries in the application (or for other purposes)

At this moment, I persist all changes on a an aggregate root (persistent actor for a specific instance) by writing a changed event with a specific persistenceId in the payload (of the aggregate root) to a generic persistenceId and use that to create a persistent view that uses the specific aggregate root Id as persistenceId (discussed in another group topic).

When tags are not the way forward, there is a need for some kind of query/resolver mechanism that allows to define what the persistentview is listening to. In my use case, the persistent actor is a cluster sharded actor and the persistenceId is actually the unique identifier of an instance of an aggregate root. My view would want to listen to the all events persisted by this type of aggregate root. (So that meta data needs to be available too)

Kind regards,

Olger

Btw: repost, sloooow internet in the mountains.

Greg Young

unread,
Aug 22, 2014, 8:52:07 AM8/22/14
to akka...@googlegroups.com
For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time.  As you mentioned, this needs to be idempotent to handle at least once delivery. 


If a projection can store its checkpoint atomically with it's read model it simulates only-once messaging providing there is an ordering assurance on the stream it's listening to

Ashley Aitken

unread,
Aug 26, 2014, 12:49:02 PM8/26/14
to akka...@googlegroups.com
On Thursday, 21 August 2014 16:01:26 UTC+8, rkuhn wrote:

My initial impulse would be to postulate an isomorphism between Actor and AR and then say that the Actor Model works just fine with only direct message sends, so the same must hold for ARs and their commands/events.

Unfortunately, I am coming to believe this may not be correct.  Akka message passing and related functionality is fantastic for processing within an aggregate, and Akka Persistence is great for aggregate persistence.  However, it seems that most modern CQRS/ES implementations use and require a reliable EventBus and possibly a reliable CommandBus for the *write* side (with the EventBus and projections for the read side). 

I suggest Vaughn's creation of Number 9, AFAIK an implementation of reliable messaging to multiple actors (e.g. for events), is a symptom of this lack of a reliable EventBus but not the full solution that a EventBus (and CommandBus) would provide.  I believe a full CQRS/ES implementation may require reliable decoupled command delivery and reliable publish-subscribe for events.  

In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge. 

No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event.  An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process.  Again this seems to require a reliable publish-subscribe facility for events.

Now I am really impressed with the functionality provided by Akka, including its routers and distributed pub-bub functionality.  And, as we have seen how Akka Persistence and its distributed journal / store can enable AtLeastOnceDelivery for messages. I thus suggest that the distributed store could more generally enable other services to move up to a more reliable level of local and distributed functionality (IF NEEDED).  

Currently:  Akka Persistence provides PersistentActosr and AtLeastOnceDelivery

Future?: Akka Persistence provides PersisentActor, AtLeastOnceDelivery, ReliableRouter, ReliableDistributedPubSub, and Reliable...?

Of course, if reliable routing and publish-subscribe can be implemented in another way in Akka that would be fine as well. Or perhaps the successful "let it crash" approach extends to messaging, i.e. "let it be lost."  If so I am keen to find out how to make a CQRS/ES implementation work effectively if/when commands are lost and if/when events are not delivered to those needing to be notified of them.

The require reliable functionality can obviously be added on top of Akka by using other services.  However, I think use of the distributed store that comes with Akka Persistence to implement these could remove the need for an extra infrastructure component.  And, if this is done with a store that may be central to enterprise infrastructure, e.g. Kafka, then that would also make integration easier.  

I see something like this was discussed early in 2013: https://groups.google.com/d/topic/akka-user/cmDna0_Mo58/discussion

Cheers,
Ashley.


Roland Kuhn

unread,
Aug 27, 2014, 11:08:27 AM8/27/14
to akka-user
26 aug 2014 kl. 18:49 skrev Ashley Aitken <amai...@gmail.com>:


On Thursday, 21 August 2014 16:01:26 UTC+8, rkuhn wrote:

My initial impulse would be to postulate an isomorphism between Actor and AR and then say that the Actor Model works just fine with only direct message sends, so the same must hold for ARs and their commands/events.

Unfortunately, I am coming to believe this may not be correct.  Akka message passing and related functionality is fantastic for processing within an aggregate, and Akka Persistence is great for aggregate persistence.  However, it seems that most modern CQRS/ES implementations use and require a reliable EventBus and possibly a reliable CommandBus for the *write* side (with the EventBus and projections for the read side). 

I suggest Vaughn's creation of Number 9, AFAIK an implementation of reliable messaging to multiple actors (e.g. for events), is a symptom of this lack of a reliable EventBus but not the full solution that a EventBus (and CommandBus) would provide.  I believe a full CQRS/ES implementation may require reliable decoupled command delivery and reliable publish-subscribe for events.  

Yes, we have come to the same conclusion, see this thread.


In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge. 

No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event.  An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process.  Again this seems to require a reliable publish-subscribe facility for events.

Yes, this is a good point: I was thinking of ephemeral (per-request) process managers, for which I still believe that the message flows should be organized such that the PM does not need to subscribe to events—that would impose too much overhead. Your example of a “standing” process is exactly one of those read-side consumers that we have in mind for the proposed PersistentView replacement.

Now I am really impressed with the functionality provided by Akka, including its routers and distributed pub-bub functionality.  And, as we have seen how Akka Persistence and its distributed journal / store can enable AtLeastOnceDelivery for messages. I thus suggest that the distributed store could more generally enable other services to move up to a more reliable level of local and distributed functionality (IF NEEDED).  

Currently:  Akka Persistence provides PersistentActosr and AtLeastOnceDelivery

Future?: Akka Persistence provides PersisentActor, AtLeastOnceDelivery, ReliableRouter, ReliableDistributedPubSub, and Reliable...?

Of course, if reliable routing and publish-subscribe can be implemented in another way in Akka that would be fine as well. Or perhaps the successful "let it crash" approach extends to messaging, i.e. "let it be lost."  If so I am keen to find out how to make a CQRS/ES implementation work effectively if/when commands are lost and if/when events are not delivered to those needing to be notified of them.

There is a fundamental difference between commands and events: if a command is lost then the contained intent evaporates and nothing happens, the sender will eventually recognize this and retry or abort the transaction. An event OTOH represents a part of history and therefore must not be lost for the purpose of creating consistent views upon that same history. For these use-cases we propose deterministically replayable event streams, you can also call them persistent cursors or iterators if you will.

The require reliable functionality can obviously be added on top of Akka by using other services.  However, I think use of the distributed store that comes with Akka Persistence to implement these could remove the need for an extra infrastructure component.  And, if this is done with a store that may be central to enterprise infrastructure, e.g. Kafka, then that would also make integration easier.  

I see something like this was discussed early in 2013: https://groups.google.com/d/topic/akka-user/cmDna0_Mo58/discussion

Yes, and I believe we are finally in a position to propose a real solution :-) (which of course also matches Jonas’ advice from back then).

Regards,

Roland


Cheers,
Ashley.



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Roland Kuhn

unread,
Aug 27, 2014, 11:10:05 AM8/27/14
to akka-user
Thanks for contributing this use-case, we think that we incorporated it in our proposal but it would be good to cross check and remove any possible misunderstandings.

Regards,

Roland


Kind regards,

Olger

Btw: repost, sloooow internet in the mountains. 

-- 
    Read the docs: http://akka.io/docs/
    Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Ashley Aitken

unread,
Aug 27, 2014, 1:27:29 PM8/27/14
to akka...@googlegroups.com


On Wednesday, 27 August 2014 23:08:27 UTC+8, rkuhn wrote:

In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge. 

No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event.  An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process.  Again this seems to require a reliable publish-subscribe facility for events.

Yes, this is a good point: I was thinking of ephemeral (per-request) process managers, for which I still believe that the message flows should be organized such that the PM does not need to subscribe to events—that would impose too much overhead.

I understand the concerns about overhead on the read-side but wonder how other CQRS/ES implementations handle this.  I asked a question related to this on Stack Overflow and received an interesting answer (explained fully in the comments):


Your example of a “standing” process is exactly one of those read-side consumers that we have in mind for the proposed PersistentView replacement.

I apologise if I am misunderstanding what you mean by "read-side" but this PM issues commands so I had assumed it must be on the write-side.  If you mean using PersistentView(v2) makes it read-side (I assume one could use PersistentViews on the write-side) then that's ok.

Please consider also this diagram, similar to the last one I shared:


IMO:  Write Side = [1] + [2],  Read Side = [3], but I see the similarity of [2] to [3] in a round-about way if the client is included.

There is a fundamental difference between commands and events: if a command is lost then the contained intent evaporates and nothing happens, the sender will eventually recognize this and retry or abort the transaction.

Yes, I had thought this as well based on what I had read but again from the comments to the Quora answer:  "If your commandbus is not reliable, you can use a timeout mechanism (i.e., check for expected result of command after some time), but this requires reliable timeouts."  

Does Akka have a reliable timeout mechanism (which I assume it means work across crashes)?

Thanks,
Ashley.


Roland Kuhn

unread,
Aug 28, 2014, 5:33:43 AM8/28/14
to akka-user
27 aug 2014 kl. 19:27 skrev Ashley Aitken <amai...@gmail.com>:

On Wednesday, 27 August 2014 23:08:27 UTC+8, rkuhn wrote:

In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge. 

No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event.  An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process.  Again this seems to require a reliable publish-subscribe facility for events.

Yes, this is a good point: I was thinking of ephemeral (per-request) process managers, for which I still believe that the message flows should be organized such that the PM does not need to subscribe to events—that would impose too much overhead.

I understand the concerns about overhead on the read-side but wonder how other CQRS/ES implementations handle this.  I asked a question related to this on Stack Overflow and received an interesting answer (explained fully in the comments):


Thanks for the link, this is indeed interesting. I think with Actors we have more freedom to optimize, but I agree that in principle the PM acts based on Events.


Your example of a “standing” process is exactly one of those read-side consumers that we have in mind for the proposed PersistentView replacement.

I apologise if I am misunderstanding what you mean by "read-side" but this PM issues commands so I had assumed it must be on the write-side.  If you mean using PersistentView(v2) makes it read-side (I assume one could use PersistentViews on the write-side) then that's ok.

Well, obviously a PM wears multiple hats in this game since it is involved both with commands and events.


Please consider also this diagram, similar to the last one I shared:


IMO:  Write Side = [1] + [2],  Read Side = [3], but I see the similarity of [2] to [3] in a round-about way if the client is included.

There is a fundamental difference between commands and events: if a command is lost then the contained intent evaporates and nothing happens, the sender will eventually recognize this and retry or abort the transaction.

Yes, I had thought this as well based on what I had read but again from the comments to the Quora answer:  "If your commandbus is not reliable, you can use a timeout mechanism (i.e., check for expected result of command after some time), but this requires reliable timeouts."  

Yes, if the PM is really the final entity driving things, then yes, you need reliable timeouts. If the ultimate driver is some external client then that will restart the process (if desired) if the PM does not reply within a certain amount of time, obviating the need for the PM to be 100% reliable.


Does Akka have a reliable timeout mechanism (which I assume it means work across crashes)?

We recently merged a change that restarts the entities in ClusterSharding based on which ones were active before a crash, I think this would solve this issue (assuming that each entity properly persists that it was waiting for something).

Regards,

Roland


Thanks,
Ashley.



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Tasio Guevara

unread,
Nov 27, 2014, 10:37:30 AM11/27/14
to akka...@googlegroups.com
I think that the PMs are actually behaving just like ARs => they consume commands and generate events. I'm doing the mapping from events to commands in stateless domain services. That way it feels more natural to use event sourcing also for the PMs. Does this make sense?
...
Reply all
Reply to author
Forward
0 new messages