Akka Persistence on the Query Side: The Conclusion

5179 views
Skip to first unread message

Roland Kuhn

unread,
Aug 27, 2014, 10:34:30 AM8/27/14
to akka-user, gregor...@gmail.com, amai...@gmail.com, Vaughn Vernon, Martin Krasser
Dear hakkers,

there have been several very interesting, educational and productive threads in the past weeks (e.g. here and here). We have taken some time to distill the essential problems as well as discuss the proposed solutions and below is my attempt at a summary. In the very likely case that I missed something, by all means please raise your voice. The intention for this thread is to end with a set of github issues for making Akka Persistence as closely aligned with CQRS/ES principles as we can make it.

As Greg and others have confirmed, the write-side (PersistentActor) is already doing a very good job, so we do not see a need to change anything at this point. My earlier proposal of adding specific topics as well as the discussed labels or tags all feel a bit wrong since they benefit only the read-side and should therefore not be a concern/duty of the write-side.

On the read-side we came to the conclusion that PersistentView basically does nearly the right thing, but it focuses on the wrong aspect: it seems most suited to track a single PersistentActor with some slack, but also not with back-pressure as a first-class citizen (it is possible to achieve it, albeit not trivial). What we distilled as the core functionality for a read-side actor is the following:

  • it can ask for a certain set of events
  • it consumes the resulting event stream on its own schedule
  • it can be stateful and persistent on its own

This does not preclude populating e.g. a graph database or a SQL store directly from the journal back-end via Spark, but we do see the need to allow Akka Actors to be used to implement such a projection.

Starting from the bottom up, allowing the read-side to be a PersistentActor in itself means that receiving Events should not require a mixin trait like PersistentView. The next bullet point means that the Event stream must be properly back-pressured, and we have a technology under development that is predestined for such an endeavor: Akka Streams. So the proposal is that any Actor can obtain the ActorRef for a given Journal and send it a request for the event stream it wants, and in response it will get a message containing a stream (i.e. Flow) of events and some meta-information to go with it.

The question that remains at this point is what exactly it means to “ask for a certain set of events”. In order to keep the number of abstractions minimal, the first use-case for this feature is the recovery of a PersistentActor. Each Journal will probably support different kinds of queries, but it must for this use-case respond to

case class QueryByPersistenceId(id: String, fromSeqNr: Long, toSeqNr: Long)

with something like

case class EventStreamOffer(metadata: Metadata, stream: Flow[PersistentMsg])

The metadata allows the recipient to correlate this offer with the corresponding request and it contains other information as we will see in the following.

Another way to ask for events was discussed as Topics or Labels or Tags in the previous threads, and the idea was that the generated stream of all events was enriched by qualifiers that allow the Journal to construct a materialized view (e.g. a separate queue that copies all events of a given type). This view then has a name that is requested from the read-side in order to e.g. have an Actor that keeps track of certain aspects of all persistent ShoppingCarts in a retail application. As I said above we think that this concern should be handled outside of the write-side because logically it does not belong there. Its closest cousin is the construction of an additional index or view within a SQL store, maintained by the RDBMS upon request from the DBA, but available to and relied upon by the read-side. We propose that this is also how this should work with Akka Persistence: the Journal is free to allow the configuration of materialized views that can be requested as event streams by name. The extraction of the indexing characteristics is performed by the Journal or its backing store, outside the scope of the Journal SPI; one example of doing it this way has been implemented by Martin already. We propose to access the auxiliary streams by something like

case class QueryKafkaTopic(name: String, fromSeqNr: Long, toSeqNr: Long)

Sequence numbers are necessary for deterministic replay/consumption. We had long discussions about the scalability implications, which is the reason why we propose to leave such queries proprietary to the Journal backend. Assuming a perfectly scalable (but then of course not real-time linearizable) Journal, the query might allow only

case class QuerySuperscalableTopic(name: String, fromTime: DateTime)

This will try to give you all events that were recorded after the given moment, but replay will not be deterministic, there will not be unique sequence numbers. These properties will be reflected in the Metadata that comes with the EventStreamOffer.

The last way to ask for events is to select them using an arbitrarily powerful query at runtime, probably with dynamic parameters so that it cannot be prepared or materialized while writing the log. Whether and how this is supported by the Journal depends on the precise back-end, and this is very much deliberate: we want to allow the Journal implementations to focus on different use-cases and offer different feature trade-offs. If a RDBMS is used, then things will naturally be linearized, but less scalable, for example. Document databases can extract a different set of features than when storing BLOBs in Oracle, etc. The user-facing API would be defined by each Journal implementation and could include

case class QueryEventStoreJS(javascriptCode: String)
case class QueryByProperty(jsonKey: String, value: String, since: DateTime)
case class QueryByType(clazz: Class[_], fromSeqNr: Long, toSeqNr: Long)
case class QueryNewStreams(fromSeqNr: Long, toSeqNr: Long)

The last one should elegantly solve the use-case of wanting to catalog which persistenceIds are valid in the Journal (which has been requested several times as well). As discussed for the SuperscalableTopic, each Journal would be free to decide whether it wants to implement deterministic replay, etc.

Properly modeling streams of events as Akka Streams feels like a consistent way forward, it also allows non-actor code to be employed for doing stream processing on the resulting event streams, including merging multiple of them or feeding events into Spark—the possibilities are boundless. I’m quite excited by this new perspective and look forward to your feedback on how well this helps Akka users implement the Q in CQRS.

Regards,


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Prakhyat Mallikarjun

unread,
Aug 28, 2014, 2:33:39 AM8/28/14
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Hi Roland,

I went through important discussion on querying at https://groups.google.com/forum/#!msg/akka-user/SL5vEVW7aTo/KfqAXAmzol0J. But it did not take me to any conclusion. Thanks for this conclusion post.

Journal will always have the history of events. Why one will query for history of events? Most of the queries will require current state.

Consider I have a domain object "Account" as aggregate root and corresponding persistent actor for it. The account will go through lots of changes i.e. credits debits. Journal will have credit and debit events.

Consider user is required to get current balance of an account, how stream of history of events will suite here? 

Also when user will query, he will not have any knowledge on (fromSeqNr, toSeqNr) and (Time).

User will query get me balance for a given account. User will just have the account number.

The use case will become even more complex when user wants to query from multiple aggregate roots.

I am unable to understand, How for business specific querying event stream will function?

-Prakhyat M M 

Martin Krasser

unread,
Aug 28, 2014, 3:01:43 AM8/28/14
to akka...@googlegroups.com
Hi Roland,

thanks for the summary, I think that's the right direction.

In your summary, the only query command type pre-defined in akka-persistence is QueryByPersistenceId. I'd find it useful to further pre-define other query command types in akka-persistence to cover the most common use cases, such as:

- QueryByStreamDeterministic(name, from, to) (as a generalization of QueryKafkaTopic, ... and maybe also QueryByPersistenceId)
- QueryByTypeDeterministic(type, from, to)
- QueryByStream(name, fromTime)
- QueryByType(type, fromTime)

Supporting these commands would still be optional but it would give better guidance for plugin developers which queries to support and, more importantly, make it easier for applications to switch from one plugin to another. Other, more specialized queries would still remain plugin-specific such as QueryByProperty, QueryDynamic(queryString), etc ...

WDYT?

Cheers,
Martin

ahjohannessen

unread,
Aug 28, 2014, 5:12:34 AM8/28/14
to akka...@googlegroups.com
Hi Martin,


On Thursday, August 28, 2014 8:01:43 AM UTC+1, Martin Krasser wrote:

In your summary, the only query command type pre-defined in akka-persistence is QueryByPersistenceId. I'd find it useful to further pre-define other query command types in akka-persistence to cover the most common use cases, such as:

- QueryByStreamDeterministic(name, from, to) (as a generalization of QueryKafkaTopic, ... and maybe also QueryByPersistenceId)
- QueryByTypeDeterministic(type, from, to)
- QueryByStream(name, fromTime)
- QueryByType(type, fromTime)

Supporting these commands would still be optional but it would give better guidance for plugin developers which queries to support and, more importantly, make it easier for applications to switch from one plugin to another. Other, more specialized queries would still remain plugin-specific such as QueryByProperty, QueryDynamic(queryString), etc ...

I think it is a great idea to standardize on common cases such as those you line up, because it gives guidance and reduces one-off fragmentation among journal APIs. It is reasonable that akka-persistence sets some sort of standard with respect to general use cases of reading streams.

Roland Kuhn

unread,
Aug 28, 2014, 6:00:17 AM8/28/14
to akka-user
Hi Martin and Alex,

the point you raise is a good one, I did not include it in my first email because defining these common (and thereby de-facto required) queries is not a simple task: we should not include something that most journals will opt out of, and what we pick must be of general interest because it presents a burden to every journal implementor—at least morally.

My reasoning for keeping persistenceIds and arbitrary named streams separate is that persistenceIds must be supported in a fully linearizable fashion whereas named streams do not necessarily have this requirement; on the write-side it might make sense to specify that persistenceIds are only ever written to from one actor at a time, which potentially simplifies the Journal implementation. This also does not hold for named streams.

Concerning types I assume that there is an implicit expectation that they work like types in Java: when I persist a ShoppingCartCreated event I want to see it in the stream for all ShoppingCartEvents. This means that the Journal needs to understand subtype relationships that in turn have to be lifted from the programming language used. It might be that this is possible, but at least it is not trivial. Is it reasonable to expect most Journal implementations to support this?

One query that we might want to include generically is the ability to ask for the merged streams of multiple persistenceIds—be that deterministic or not.

Another thought: there should probably be a trait JournalQuery from which the discussed case classes inherit, and we should specify that the Journal is obliged to reply to all such requests, explicitly denying those it does not implement.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Martin Krasser

unread,
Aug 28, 2014, 8:23:47 AM8/28/14
to akka...@googlegroups.com

On 28.08.14 12:00, Roland Kuhn wrote:
Hi Martin and Alex,

the point you raise is a good one, I did not include it in my first email because defining these common (and thereby de-facto required) queries is not a simple task: we should not include something that most journals will opt out of, and what we pick must be of general interest because it presents a burden to every journal implementor—at least morally.

My reasoning for keeping persistenceIds and arbitrary named streams separate is that persistenceIds must be supported in a fully linearizable fashion whereas named streams do not necessarily have this requirement; on the write-side it might make sense to specify that persistenceIds are only ever written to from one actor at a time, which potentially simplifies the Journal implementation. This also does not hold for named streams.

I agree, makes sense to keep QueryByPersistenceId a separate type.



Concerning types I assume that there is an implicit expectation that they work like types in Java: when I persist a ShoppingCartCreated event I want to see it in the stream for all ShoppingCartEvents. This means that the Journal needs to understand subtype relationships that in turn have to be lifted from the programming language used. It might be that this is possible, but at least it is not trivial. Is it reasonable to expect most Journal implementations to support this?

I only included it in my proposal because it was discussed/requested very often if I remember correctly. It may not be trivial to support though. Fine for me if they're not pre-defined.



One query that we might want to include generically is the ability to ask for the merged streams of multiple persistenceIds—be that deterministic or not.

It can be pretty hard for some backend stores to support that in a scalable way (i.e. that scales to a large number of persistenceIds) as you cannot pre-compute the results because the persistenceIds are query arguments.

Having a QueryByStream(name, ...) in additition to QueryByPersistenceId is much less of a burden and serves a wide range of use cases.


Another thought: there should probably be a trait JournalQuery from which the discussed case classes inherit, and we should specify that the Journal is obliged to reply to all such requests, explicitly denying those it does not implement.

+1

Greg Young

unread,
Aug 28, 2014, 9:12:16 AM8/28/14
to Prakhyat Mallikarjun, akka...@googlegroups.com, amai...@gmail.com, Vaughn Vernon, Martin Krasser
"I am unable to understand, How for business specific querying event stream will function?"

"Journal will always have the history of events. Why one will query for history of events? Most of the queries will require current state."

I linked a video in this thread that covers some such reasons
--
Studying for the Turing test

ahjohannessen

unread,
Aug 28, 2014, 3:53:37 PM8/28/14
to akka...@googlegroups.com
Hi Roland,


On Thursday, August 28, 2014 11:00:17 AM UTC+1, rkuhn wrote:

Concerning types I assume that there is an implicit expectation that they work like types in Java: when I persist a ShoppingCartCreated event I want to see it in the stream for all ShoppingCartEvents. This means that the Journal needs to understand subtype relationships that in turn have to be lifted from the programming language used. It might be that this is possible, but at least it is not trivial. Is it reasonable to expect most Journal implementations to support this?

Having a stream per event type is overkill in my opinion. I think first and foremost the *primary* need concerning types is to be able to group per persistent actor type and not to differentiate individual events, that's possible to do afterwards with something like streamz by Martin.

Olger Warnier

unread,
Aug 28, 2014, 4:49:45 PM8/28/14
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Hi Roland,

great conclusion, with everything said till now in mind. I'd like to mention a view points and like to hear if this matches your thoughts:

The  QueryByTypeDeterministic(type, from, to) mentioned by Martin matches with the way I've been using DDD+CQRS for last 2 years. 
Meaning that the type is actually the type of the PersistentActor. (not of an event) 
By streaming all events of a certain PersistentActor type, it's possible to create current state (for instance in a RDBMS or Graph database) that is used by the application. The application Queries these stores (RDBMS/Graph) and will not use the event stream or journal in a direct way. (so the real performance requirements are somewhere else)

I believe that it is quite important to have these API case classes standardized as much as is reasonably possible. Because that allows me to use another persistent store for testing (like leveldb / inmem) while in production it makes use of kafka or some other store. 
I'd prefer to have the akka-persistence layer in between, allowing me to choose the setup according to the situation. (instead of tying directly to a kafka topic / eventstore stream)

One remark about the QuerySuperscalableTopic (or kafkatopic) 
I assume the 'name' parameter has to be created in the store in question (just like an index for an RDBMS). 
This feels like moving part of the responsibility of the way your CQRS framework works towards an external component (if this 'index' is not created, the thing does not work). When you look at the initial 'tag' proposal, you had to code a tag (write side) that could be used on the read side.
In this construction you will be able to test the code setup without being dependent on a specific store setup. 


Kind regards, 

Olger


Vaughn Vernon

unread,
Aug 28, 2014, 6:31:55 PM8/28/14
to Roland Kuhn, akka-user, kras...@googlemail.com, gregor...@gmail.com, amai...@gmail.com

Hi Roland,

It's a good summary. As far as I can see it looks very complete and allows for a lot of flexibility between scalability and ease of use.

I will add that I am interested to see Martin's causal consistency with zero application-specific hints or code (unless its just obvious previous sequence as back pointer.)

Best,
Vaughn

Greg Young

unread,
Aug 28, 2014, 9:12:08 PM8/28/14
to Olger Warnier, akka...@googlegroups.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
A few things. 

First, while it is common that a projection is interested in only events from a type of producer (aggregate/actor/etc) it's also quite common they need to listen from many types.

"I believe that it is quite important to have these API case classes standardized as much as is reasonably possible. Because that allows me to use another persistent store for testing (like leveldb / inmem) while in production it makes use of kafka or some other store. 
I'd prefer to have the akka-persistence layer in between, allowing me to choose the setup according to the situation. (instead of tying directly to a kafka topic / eventstore stream)"

Wouldn't this only apply to integration testing? For all other testing wouldn't you just output events through whatever stream interface you were using? For integration purposes I know event store supports a memory only model and I would imagine Kafka does as well.

Cheers,

Greg

Ashley Aitken

unread,
Aug 29, 2014, 1:28:59 AM8/29/14
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com

Thank you Roland and team for listening and sharing your ideas and plans.

As I have said, I think there are three different but related areas of functionality being discussed here: 1) Actor Persistence, 2) Publish-Subcribe (to Topics), and 3) Full CQRS support.  

Conceptually, for CQRS (3), I like to think of the write-side (primarily) creating one big event stream that flows (primarily) to the read-side.  PersistentView actors can then query from this stream to get persistent state for an actor or a stream based on some criteria (query). 

As has been mention by yourself and others, Actor Persistence (1) seems to work very well for persistence events. However, there are a few things I suggest you could possibly do to make it even better and leverage the new functionality of PersistentView.

1. I suggest there should be a facility (Trait?) for any actor to be able to publish events to the "event stream."  I believe it is a common "requirement" in CQRS to be able to publish events to the event stream independent of supporting Actor Persistence (1), 

2. Considering the facilities to be provided for PersistentView to read from a stream based on some query, I suggest it is worth going one extra step to allow actors (not necessarily PersistentActors) to post to a specific keyword/topic(s) and thus provide Publish-Subscribe (2.), and finally

3. I would suggest some ability to provide additional metadata with the events that could be used for causal dependency / ordering hints or other functionality like Publish/Subscribe (2).  This may be possible to do just by the user including metadata within the events but I am not sure.

I note: a) Publish/Subscribe (2) is not requiring the event store implementation maintains independent topic stream, some may by default - others may construct them if queried, just that queries can possibly refer to them, and b) all three of these suggestions are somewhat related.

In summary, I think giving a little bit of extra love to the write-side (not specifically for PersistentActor) could be very useful considering the work you are planning on the read-side (PersistentView).

Thanks for considering these suggestions.

Cheers,
Ashley.


Vaughn Vernon

unread,
Aug 29, 2014, 5:25:39 AM8/29/14
to Ashley Aitken, Greg Young, akka-user, kras...@googlemail.com

Hi Ashley,

Personally I'm having a little trouble understanding how your three points differ much. Couldn't you extend PersistentActor as something like TopicEnricher and make it's enrich method call persist? I see your need, but I think it's enough out of the scope of ES that it is probably more application specific.

I do think that persisting metadata with events could benefit from an explicit parameter, so I think that's worth considering as part of the API.

Best,
Vaughn

Ashley Aitken

unread,
Aug 29, 2014, 6:34:27 AM8/29/14
to akka...@googlegroups.com

Hi Vaughn,

Thanks for your post and sorry if mine wasn't clear.


On Friday, 29 August 2014 17:25:39 UTC+8, Vaughn Vernon wrote:

Personally I'm having a little trouble understanding how your three points differ much.


You're right, they are similar / related (and I did mention that) but I think there is a logical progression to them (IMHO).

The first one says that *any* actor should be able to persist an event, not just PAs (or subclasses) with their pId and all the code for replaying events etc.

The second says that it should be possible to associate these non-state-persisting events with a topic(s), specifically to realise reliable distributed pub/sub.

The third that when persisting events there should be a more general facility to include metadata beyond the persistentId or topic name just mentioned. 
 

Couldn't you extend PersistentActor as something like TopicEnricher and make it's enrich method call persist? I see your need, but I think it's enough out of the scope of ES that it is probably more application specific.


Extending PA would mean the actor has all the machinery to save and replay events specifically for a persistenceId when the actor may just want to add events to the event stream (as a part of CQRS).  I suggest a trait would be adequate (but I am not sure).

In my understanding event sourcing is primarily about aggregate persistence (saving state incrementally and rebuilding from that), which PersistentActor does well.  CQRS usually seems to have an event stream from write to read whether or not event sourcing is used.

Cheers,
Ashley.

Markus H

unread,
Aug 31, 2014, 9:05:14 AM8/31/14
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Hi Roland,

sounds great that you are pushing for the whole CQRS story.

I'm just experimenting with akka and CQRS and have no production experience, but I'm thinking about the concepts since some time. So please take my comments with a big grain of salt and forgive me for making it sound somewhat like a wish list. But I think if there is a time for wishes, it might be now.

I feel the need to distinguish a little more between commands, querys, reads and writes.
In a CQRS setup, I (conceptually) see three parts:
    (1) the Command side: mainly eventsourced PersistentActors that build little islands of consistency for changing the application state
    (2) the link between the Command and Query side: the possibility to make use of all or a subset of all events/messages written in (1) for building an optimized query side (3) or even for updating other islands of consistency on the Command side (other aggregates or bounded contexts) in (1)
    (3) the Query side: keeping an eventually consistent view of the application state in any form that is suitable for fast application queries

From the 10000-foot view, we write to (1) and read from (3) but each of (1),(2) and (3) has its own reads and writes:
    (1) writes: -messages produced by the application to be persisted
        reads: -consistent read of the persisted messages for actor replay
           -stream of all messages (I think, that's what you mean by SuperscalableTopic)
       
    (2) writes: -at least conceptually: all messages of the all-messages stream of (1)
        reads: -different subsets of the all-messages stream that make sense to different parts of our application
       
    (3) writes: -any query-optimized form of our data that was read of some sub-stream of (2)
        reads: -whatever query the query-side datastore allows (SQL, fulltext searches, graph walks etc.)

While (1) is the current akka persistence implementation plus a way to get all messages, (2) is more like the Event Bus (though I would name it differently) in this picture from the axonframework documentation (http://www.axonframework.org/docs/2.0/images/detailed-architecture-overview.png). (1) and (2) could be done by one product like what eventstore does with its projections or it could be different products like Cassandra for (1) and Kafka for (2). (3) could be anything that holds data.

Some more detail:

On (1):
As said before, the command side is fine as it is today to put messages into the datastore and get them out again for persistent actors. I definitely would consider replaying of messages for persistent actors part of the command side, since the command side in itself has stronger consistency requirements (consistentency within an aggregate) than the query side.       

Additionally, as Ashley wrote, any actor should be able to push messages to the all-messages stream. In contrast to persistent actors I dont' see any need for replay here. Therefore and for other reasons (like message deduplication in (2)) I would like to propose adding an extra unique ID (UUID?) for any message handled by the command side, independent of an actors' persistenceId (which would be needed for replays nevertheless).

Also, I see the need to provide some guarantees for the all-messages stream. I would consider an ordering guarantee for messages from the same actor and an otherwise (at least roughly) timestamp based sorting a good compromise. This would also be comparable to the guarantees that akka provides for message sending. Ideally, the order stays the same for repeated reads of the all-messages stream. With the guarantees mentioned before, if the datastore keeps all messages of an actor on the same node, the all-messages stream could even be created per datastore node.

On (2):
As mentioned above, I see the QueryByPersistenceId as part of (1) as it requires stronger consistency guarantees. All other QueryByWhatever are all about the question, how to retrieve the right subset of messages from the all-messages stream for the application and its domain. This of course differs by application and domain. Therefore I like Martin's QueryByStream(name, ...), where a stream is any subset of messages the application cares about.

I also think it should not be up to the datastore to decide what streams to offer. I also can't really imagine how this should work in most datastores. While there might be some named index on top of JSON messages in MongoDB that can be served as as stream, I don't see how to create a stream/view/index in Key-Value stores or RDBMS where a message is probably persisted as byte array whithout any knowledge of the application.

To tell the datastore what streams to offer, I would consider something like projections in eventstore or user-defined topics in Martin's kafka-persistence that are driven by the application. In the simplest form it could be a projection function like

Message => Seq[String]

that is applied to each message of the all-messages stream, which taskes a message and gives back Strings of sub-stream-IDs. This could be anything from the type-name to the persistenceId of an actor to a property of the message. So it feels a little like "tagging on the read side" or more precisely "tagging on the way from the command to the query side". New messages should be added to the sub-streams as they arrive in the all-messages stream. This could probably be done in any datastore that can keep another index based on message IDs (if IDs are added on the command side) and is adjustable with application needs.

The interface for (2) could allow new projections that could be run against the all-messages stream or any sub-stream at any later time. A PersistentView could just track one of the projected sub-streams.

For datastore implementations this would bring the minimum requirement to construct sub-streams as told by the application and serve streams by name.

On (3):
On the query side of the application I see anything, that holds a view of the data written on the command side and allows querying/consuming by some criteria. As you wrote in your post, the variety of datastores is huge and I'm not sure if the set of common queries is very big. I like the idea of ad-hoc querying you proposed as it is a much more messaging like way to query datastores but I see disperate sets of standardized queries per datastore type (graph dbs, rdbms, key-value etc.) that focus on the datatsore conceps (tables and rows in rdbms, nodes and edges in graph dbs - something like QueryById(id, table) for RDBMS etc.) and not on messages. I would very much like to see this kind of datastore querying but I don't think it's neccessary for CQRS setups with akka.

As a user of a full akka-cqrs-solution the points to provide application logic could be
    - which messages to persist in (1)
    - which projections to make for the application in (2)
    - which project streams to consume from (2) in order to trigger anything from query-side updates to creation of new commands for other aggregates to updating a view in someone's browser

Kind regards
Markus



Ashley Aitken

unread,
Sep 1, 2014, 1:58:56 AM9/1/14
to akka...@googlegroups.com

Thank you Markus for a very useful contribution.

I too included a link to that architecture diagram in a previous post in another thread.  I think the provision of an "eventbus" for CQRS to integrate with other bounded contexts on the write-side and, of course, the read-side is very important and something that the current Akka proposal lacks.  

My challenge to the Akka team would be to consider how someone could use Akka for CQRS *without* using event sourcing to persist the actors (e.g. their state may just be saved to an SQL database).  Note that I have nothing against the current PersistentActor class, it seems great.  

As I understand it though, CQRS is completely possible without event sourcing aggregate state and I believe most such implementations still needs some sort of reliable / persistent event store / bus for integration on the write-side and updating the read models on the read-side.

Prakhyat Mallikarjun

unread,
Sep 1, 2014, 5:03:09 AM9/1/14
to akka...@googlegroups.com
Hi Ashley,

My challenge to the Akka team would be to consider how someone could use Akka for CQRS *without* using event sourcing to persist the actors (e.g. their state may just be saved to an SQL database).  Note that I have nothing against the current PersistentActor class, it seems great.  
[Prakhyat] I agree to your point but only for 'Q' in querying. 

I expect queries to be simple. Like 
1. JPA find queries in repositories.
2. Hibernate named queries.

In ORM each domain object knows its table. The ORM tool will translate the Named queries and JPA queries to dialect of connected data source. The framework will abstract the querying logic, developers just mention which domain repository to use.

Correct me If I am wrong. I want solutions from akka persistence for complex business querying which will connect different aggregate roots and complex reporting and analytic's. Also making sure data duplication should be avoided, only state should be maintained, replication of state to querying side should be avoided.

-Prakhyat M M

Greg Young

unread,
Sep 1, 2014, 1:32:20 PM9/1/14
to akka...@googlegroups.com
Most systems that store state say ORM and publish events are just really broken (in a subtle way). They have the problem of two sources of truth (what if they disagree?)

Ashley Aitken

unread,
Sep 1, 2014, 7:57:41 PM9/1/14
to akka...@googlegroups.com


On Tuesday, 2 September 2014 01:32:20 UTC+8, Greg Young wrote:
Most systems that store state say ORM and publish events are just really broken (in a subtle way). They have the problem of two sources of truth (what if they disagree?)

Thank you Greg, I agree.  My friendly challenge wasn't to discount or deprecate event sourcing in general (and especially in Akka) but just to point out that aggregate persistence is somewhat orthogonal to the rest of CQRS and the rest of CQRS is not just the read-side.  

I've focussed somewhat on persistent managers on the write-side but, as others have also pointed out (and as I am sure you know), the "event bus" is also used for systems integration and even application events.  I don't feel PersistentActor is the full solution on the write-side.


Roland Kuhn

unread,
Sep 5, 2014, 2:48:42 AM9/5/14
to akka-user, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, Martin Krasser
Hi Markus,

thanks for this very thoughtful contribution! [comments inline]

31 aug 2014 kl. 15:05 skrev Markus H <m...@heckelmann.de>:

Hi Roland,

sounds great that you are pushing for the whole CQRS story.

I'm just experimenting with akka and CQRS and have no production experience, but I'm thinking about the concepts since some time. So please take my comments with a big grain of salt and forgive me for making it sound somewhat like a wish list. But I think if there is a time for wishes, it might be now.

I feel the need to distinguish a little more between commands, querys, reads and writes.
In a CQRS setup, I (conceptually) see three parts:
    (1) the Command side: mainly eventsourced PersistentActors that build little islands of consistency for changing the application state
    (2) the link between the Command and Query side: the possibility to make use of all or a subset of all events/messages written in (1) for building an optimized query side (3) or even for updating other islands of consistency on the Command side (other aggregates or bounded contexts) in (1)
    (3) the Query side: keeping an eventually consistent view of the application state in any form that is suitable for fast application queries

From the 10000-foot view, we write to (1) and read from (3) but each of (1),(2) and (3) has its own reads and writes:
    (1) writes: -messages produced by the application to be persisted
        reads: -consistent read of the persisted messages for actor replay
           -stream of all messages (I think, that's what you mean by SuperscalableTopic)
       
    (2) writes: -at least conceptually: all messages of the all-messages stream of (1)
        reads: -different subsets of the all-messages stream that make sense to different parts of our application
       
    (3) writes: -any query-optimized form of our data that was read of some sub-stream of (2)
        reads: -whatever query the query-side datastore allows (SQL, fulltext searches, graph walks etc.)

While (1) is the current akka persistence implementation plus a way to get all messages, (2) is more like the Event Bus (though I would name it differently) in this picture from the axonframework documentation (http://www.axonframework.org/docs/2.0/images/detailed-architecture-overview.png). (1) and (2) could be done by one product like what eventstore does with its projections or it could be different products like Cassandra for (1) and Kafka for (2). (3) could be anything that holds data.

Yes, this is a very good summary.

Some more detail:

On (1):
As said before, the command side is fine as it is today to put messages into the datastore and get them out again for persistent actors. I definitely would consider replaying of messages for persistent actors part of the command side, since the command side in itself has stronger consistency requirements (consistentency within an aggregate) than the query side.       

Additionally, as Ashley wrote, any actor should be able to push messages to the all-messages stream. In contrast to persistent actors I dont' see any need for replay here. Therefore and for other reasons (like message deduplication in (2)) I would like to propose adding an extra unique ID (UUID?) for any message handled by the command side, independent of an actors' persistenceId (which would be needed for replays nevertheless).

Also, I see the need to provide some guarantees for the all-messages stream. I would consider an ordering guarantee for messages from the same actor and an otherwise (at least roughly) timestamp based sorting a good compromise. This would also be comparable to the guarantees that akka provides for message sending. Ideally, the order stays the same for repeated reads of the all-messages stream. With the guarantees mentioned before, if the datastore keeps all messages of an actor on the same node, the all-messages stream could even be created per datastore node.

On (2):
As mentioned above, I see the QueryByPersistenceId as part of (1) as it requires stronger consistency guarantees. All other QueryByWhatever are all about the question, how to retrieve the right subset of messages from the all-messages stream for the application and its domain. This of course differs by application and domain. Therefore I like Martin's QueryByStream(name, ...), where a stream is any subset of messages the application cares about.

I also think it should not be up to the datastore to decide what streams to offer. I also can't really imagine how this should work in most datastores. While there might be some named index on top of JSON messages in MongoDB that can be served as as stream, I don't see how to create a stream/view/index in Key-Value stores or RDBMS where a message is probably persisted as byte array whithout any knowledge of the application.

To tell the datastore what streams to offer, I would consider something like projections in eventstore or user-defined topics in Martin's kafka-persistence that are driven by the application. In the simplest form it could be a projection function like

Message => Seq[String]

that is applied to each message of the all-messages stream, which taskes a message and gives back Strings of sub-stream-IDs. This could be anything from the type-name to the persistenceId of an actor to a property of the message. So it feels a little like "tagging on the read side" or more precisely "tagging on the way from the command to the query side". New messages should be added to the sub-streams as they arrive in the all-messages stream. This could probably be done in any datastore that can keep another index based on message IDs (if IDs are added on the command side) and is adjustable with application needs.

The interface for (2) could allow new projections that could be run against the all-messages stream or any sub-stream at any later time. A PersistentView could just track one of the projected sub-streams.

This is the one point that is still difficult to settle on. Classifying events can be done in three locations:
  • on their way into storage; this means that it is done by the write-side (with all implied restrictions)
  • within the storage; this implies that the storage can interpret that data
  • on their way back into the application; this can be huge overhead
The third one is the only possibility that allows after-the-fact tagging without forcing the storage back-end to be able to interpret both the data and the user-supplied classification function. Unless I am missing something we will need to settle for this unsatisfying solution—at least for now.

For datastore implementations this would bring the minimum requirement to construct sub-streams as told by the application and serve streams by name.

On (3):
On the query side of the application I see anything, that holds a view of the data written on the command side and allows querying/consuming by some criteria. As you wrote in your post, the variety of datastores is huge and I'm not sure if the set of common queries is very big. I like the idea of ad-hoc querying you proposed as it is a much more messaging like way to query datastores but I see disperate sets of standardized queries per datastore type (graph dbs, rdbms, key-value etc.) that focus on the datatsore conceps (tables and rows in rdbms, nodes and edges in graph dbs - something like QueryById(id, table) for RDBMS etc.) and not on messages. I would very much like to see this kind of datastore querying but I don't think it's neccessary for CQRS setups with akka.

Yes, the read-side as defined by your (3) is out of scope for Akka Persistence.

As a user of a full akka-cqrs-solution the points to provide application logic could be
    - which messages to persist in (1)
    - which projections to make for the application in (2)
    - which project streams to consume from (2) in order to trigger anything from query-side updates to creation of new commands for other aggregates to updating a view in someone's browser

Exactly right, this fully matches my understanding of which features Akka Persistence should provide.

Roland Kuhn

unread,
Sep 5, 2014, 3:09:55 AM9/5/14
to akka-user, gregor...@gmail.com, amai...@gmail.com, Vaughn Vernon, Martin Krasser
Attempting a second round-up of what shall go into tickets, in addition to my first summary we need to:

  • predefine trait JournalQuery with minimal semantics (to make the Journal support discoverable at runtime)
  • predefine queries for named streams since that is universally useful; these are separate from PersistenceID queries due to different consistency requirements
  • add support for write-side tags (see below)
  • add a comprehensive PersistenceTestKit which supports the fabrication of arbitrary event streams for both PersistentActor and read-side verification

Ashley, your challenge about considering non-ES write-sides is one that I think we might not take up: the scope of Akka Persistence is to support persistent Actors and their interactions, therefore I believe we should be opinionated about how we achieve that. If you want to use CQRS without ES then Akka might just not be for you (for some values of “you”, not necessarily you ;-) ).

Now why tags? My previous conclusion was that burdening the write-side with generating them goes counter to the spirit of ES in that this tagging should well be possible after the fact. The problem is that that can be extremely costly, so spawning a particular query on the read-side should not implicitly replay all events of all time, that has the potential of bringing down the whole system due to overload. I still think that stores might want to offer this feature under the covers—i.e. not accessible via Akka Persistence standard APIs—but for those that cannot we need to provide something else. The most prominent use of tags will probably be that each kind of PersistentActor has its own tag, solving the type issue as well (as brought up by Alex and Olger). In summary, write-side tags are just an optimization.

Concerning the ability to publish to arbitrary topics from any Actor I am on the fence: this is a powerful feature that can be quite a burden to implement. What we are defining here is—again—Akka Persistence, meaning that all things that are journaled are intended to stay there eternally. Using this to realize a (usually ephemeral) event bus is probably going to suffer from impedance mismatches, as witnessed by previous questions concerning the efficiency of deleting log entries—something that should really not be done in the intended use-cases. So, if an Actor wants to persist an Event to make it part of the journaled event stream, then I’d argue that that Actor is at least conceptually a PersistentActor. What is wrong with requiring it to be one also in practice? The only thing that we might want to add is that recovery (i.e. the write-side reading of events) can be opted out of. Thoughts?

For everything going beyond the above I’d say we should wait and see what extensions are provided by Journal implementations and how well they work in practice.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Martin Krasser

unread,
Sep 5, 2014, 3:49:20 AM9/5/14
to akka...@googlegroups.com

On 05.09.14 09:09, Roland Kuhn wrote:
Attempting a second round-up of what shall go into tickets, in addition to my first summary we need to:

  • predefine trait JournalQuery with minimal semantics (to make the Journal support discoverable at runtime)
  • predefine queries for named streams since that is universally useful; these are separate from PersistenceID queries due to different consistency requirements
  • add support for write-side tags (see below)
  • add a comprehensive PersistenceTestKit which supports the fabrication of arbitrary event streams for both PersistentActor and read-side verification

Ashley, your challenge about considering non-ES write-sides is one that I think we might not take up: the scope of Akka Persistence is to support persistent Actors and their interactions, therefore I believe we should be opinionated about how we achieve that. If you want to use CQRS without ES then Akka might just not be for you (for some values of “you”, not necessarily you ;-) ).

Now why tags? My previous conclusion was that burdening the write-side with generating them goes counter to the spirit of ES in that this tagging should well be possible after the fact. The problem is that that can be extremely costly, so spawning a particular query on the read-side should not implicitly replay all events of all time, that has the potential of bringing down the whole system due to overload. I still think that stores might want to offer this feature under the covers—i.e. not accessible via Akka Persistence standard APIs—but for those that cannot we need to provide something else. The most prominent use of tags will probably be that each kind of PersistentActor has its own tag, solving the type issue as well (as brought up by Alex and Olger). In summary, write-side tags are just an optimization.

Concerning the ability to publish to arbitrary topics from any Actor I am on the fence: this is a powerful feature that can be quite a burden to implement. What we are defining here is—again—Akka Persistence, meaning that all things that are journaled are intended to stay there eternally. Using this to realize a (usually ephemeral) event bus is probably going to suffer from impedance mismatches, as witnessed by previous questions concerning the efficiency of deleting log entries—something that should really not be done in the intended use-cases. So, if an Actor wants to persist an Event to make it part of the journaled event stream, then I’d argue that that Actor is at least conceptually a PersistentActor. What is wrong with requiring it to be one also in practice? The only thing that we might want to add is that recovery (i.e. the write-side reading of events) can be opted out of. Thoughts?

Already doable with:

    override def preStart(): Unit = {
      self ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, replayMax = 0L)
    }

but maybe you were more thinking about different traits for writing and recovery (?)

Olger Warnier

unread,
Sep 5, 2014, 4:02:24 AM9/5/14
to akka...@googlegroups.com
Hi Roland, 

This will certainly simplify my code. So from my side it will be a good 'start' to experiment with these additions in practice and see what's missing. 

Kind regards, 

Olger

Roland Kuhn

unread,
Sep 5, 2014, 4:31:40 AM9/5/14
to akka-user
5 sep 2014 kl. 09:49 skrev Martin Krasser <kras...@googlemail.com>:


On 05.09.14 09:09, Roland Kuhn wrote:
Attempting a second round-up of what shall go into tickets, in addition to my first summary we need to:

  • predefine trait JournalQuery with minimal semantics (to make the Journal support discoverable at runtime)
  • predefine queries for named streams since that is universally useful; these are separate from PersistenceID queries due to different consistency requirements
  • add support for write-side tags (see below)
  • add a comprehensive PersistenceTestKit which supports the fabrication of arbitrary event streams for both PersistentActor and read-side verification

Ashley, your challenge about considering non-ES write-sides is one that I think we might not take up: the scope of Akka Persistence is to support persistent Actors and their interactions, therefore I believe we should be opinionated about how we achieve that. If you want to use CQRS without ES then Akka might just not be for you (for some values of “you”, not necessarily you ;-) ).

Now why tags? My previous conclusion was that burdening the write-side with generating them goes counter to the spirit of ES in that this tagging should well be possible after the fact. The problem is that that can be extremely costly, so spawning a particular query on the read-side should not implicitly replay all events of all time, that has the potential of bringing down the whole system due to overload. I still think that stores might want to offer this feature under the covers—i.e. not accessible via Akka Persistence standard APIs—but for those that cannot we need to provide something else. The most prominent use of tags will probably be that each kind of PersistentActor has its own tag, solving the type issue as well (as brought up by Alex and Olger). In summary, write-side tags are just an optimization.

Concerning the ability to publish to arbitrary topics from any Actor I am on the fence: this is a powerful feature that can be quite a burden to implement. What we are defining here is—again—Akka Persistence, meaning that all things that are journaled are intended to stay there eternally. Using this to realize a (usually ephemeral) event bus is probably going to suffer from impedance mismatches, as witnessed by previous questions concerning the efficiency of deleting log entries—something that should really not be done in the intended use-cases. So, if an Actor wants to persist an Event to make it part of the journaled event stream, then I’d argue that that Actor is at least conceptually a PersistentActor. What is wrong with requiring it to be one also in practice? The only thing that we might want to add is that recovery (i.e. the write-side reading of events) can be opted out of. Thoughts?

Already doable with:

    override def preStart(): Unit = {
      self ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, replayMax = 0L)
    }

but maybe you were more thinking about different traits for writing and recovery (?)

True, this looks good enough for now. If this surfaces a lot then we might think about adding a pure writer trait later; I think PersistentActor is unlikely to need to change for its intended purpose so we should be good (considering source compatibility beyond 2.4.0).

Regards,

Roland

Vaughn Vernon

unread,
Oct 6, 2014, 2:09:56 PM10/6/14
to Roland Kuhn, akka-user, Greg Young, Ashley Aitken, Martin Krasser
Hi Roland,

I's been a month this the last update on this and I have lost track of the status.

Can you provide an update on where this stands? Is there a more recent akka-persistence build that supports the conclusions reached in this discussion? If so, what is the release number? If no, when is will the proposed features be released?

Best,
Vaughn

Roland Kuhn

unread,
Oct 7, 2014, 1:32:20 AM10/7/14
to Vaughn Vernon, akka-user, Greg Young, Ashley Aitken, Martin Krasser
Hi Vaughn,

from our side nothing has happened yet: my conclusion is that this thread contains all the information we need when we start working on this. The reason why we are waiting is that this work will depend heavily upon Akka Streams and therefore we are finishing those first, which should take roughly one month. Meanwhile, if use cases come up which could be used to refine the plans, please point them out here so that we can take all the inputs into account.

Regards,

Roland

Kirill Wedens

unread,
Oct 7, 2014, 6:13:03 AM10/7/14
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Will updated read side include ability to replay views from multiple streams? It's common scenario when view may need to handle events of some types from multiple different event sources. I think main problem here is causal ordering of events across multiple streams. And as I understand such "extensions" should be provided by journal implementation?

Vaughn Vernon

unread,
Nov 17, 2014, 12:34:16 PM11/17/14
to akka...@googlegroups.com, vve...@shiftmethod.com, gregor...@gmail.com, amai...@gmail.com, kras...@googlemail.com
Hi Roland,

I am still tracking this one. As far as I know the team hasn't gotten back to this work yet. How are Akka Streams looking in support of this effort? Anything I can do to help move this forward?

Best,
Vaughn

Sebastian Bach

unread,
Jan 7, 2015, 4:07:54 PM1/7/15
to akka...@googlegroups.com, vve...@shiftmethod.com, gregor...@gmail.com, amai...@gmail.com, kras...@googlemail.com
Hi Roland,

one thing to keep in mind in the CQRS/ES architecture is that not only the query side depends on the command side (by following the event stream) but also the command side depends on the query side for validation of complex business rules. This has a deep impact on correctness and throughput. Validation checks on an potentially outdated query model in an eventually consistent architecture is a hard problem (e.g. adding a sold out item to the shopping cart). The consistency of the query model should be achieved as soon as possible and close to real-time. A PersistentView in Akka has a default of 5s? On the other hand the speed of validation depends on the speed of the queries. And the throughput depends on the validation speed. Thus, queries directly on the whole event stream are less useful than persistent projections.

Keep up the good work :)
Cheers
Sebastian

Greg Young

unread,
Jan 7, 2015, 4:15:42 PM1/7/15
to Sebastian Bach, akka...@googlegroups.com, Vaughn Vernon, Ashley Aitken, Martin Krasser
"The consistency of the query model should be achieved as soon as
possible and close to real-time."

It really depends on the domain. I have worked in many situations
where the data in question would be perfectly fine updated once per
month.

" (e.g. adding a sold out item to the shopping cart)."

This is a funny example because it shows not that you need to update
read models more quickly but that you need to get the whole business
on board. Remember that computer systems are normally part of a larger
system fulfilling business needs. It really is a mind shift moving to
eventual consistency.

In the example of adding a sold out item... why stop it? Does it
matter that we don't have any of this item? The real question is how
quickly we can get it and if its worth our while to do so. To be fair
30 years ago these times were much much higher than what we talk about
today and yet businesses still managed to work their way through
things.

For many of these types allowing things to go incorrectly is actually
a good thing (overbooked seats on an airline, overdraft charges at
banks...). To really be benefiting from eventual consistency the whole
business process must recognize it. In terms of handling failures they
are normally handled in a reactive not a preventative manner (like
most business problems). Detect the failure, let a human deal with it.

At the end of the day the primary role of the computer system is to
take workload off of humans. You will hit the law of diminishing
returns. dont try to solve every problem :)

Greg

Jonas Bonér

unread,
Jan 9, 2015, 2:53:41 AM1/9/15
to Akka User List, Sebastian Bach, Vaughn Vernon, Ashley Aitken, Martin Krasser
That is a great point Greg. 
--
Jonas Bonér
Home: jonasboner.com
Twitter: @jboner
Public Key: keybase.io/jonas

Sebastian Bach

unread,
Jan 9, 2015, 5:02:36 AM1/9/15
to akka...@googlegroups.com, sebastian....@gmail.com, vve...@shiftmethod.com, amai...@gmail.com, kras...@googlemail.com

Thank you Greg. The mind shift from a preventive to a reactive workflow is not easy for users (humans), because it requires a change of habits. For many people computer systems are kind of authoritative. There is this wrong assumption (from early days of computation?) that a computer accepts only a valid input or returns an error. It was then only black or white, the golden era of transactions. But this was (as you pointed out) always to some degree an hypocrisy. Now we have this shades of gray and many users feel unsettled. This holds true for any kind of resource allocation application and the overbooking (or wrong booking) problem. Some of the users define taking workload off of them as avoiding of planning mistakes, like commit and forget. But the actual workflow seems to shift towards an iterative process of human-computer interaction, to some kind of react-react ping-pong.

Best Regards
Sebastian

Greg Young

unread,
Jan 9, 2015, 5:56:40 AM1/9/15
to akka...@googlegroups.com

Usually it comes down to the realization that the computer is not the book of record. One of my favourites was being asked to build a fully consistent inventory system. I generally like to approach things with questions, the one i had was 'sure but how do we get people who are stealing stuff to appropriately check it out?'

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/MNDc9cVG1To/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Viktor Klang

unread,
Jan 9, 2015, 6:00:49 AM1/9/15
to Akka User List
On Fri, Jan 9, 2015 at 11:56 AM, Greg Young <gregor...@gmail.com> wrote:

Usually it comes down to the realization that the computer is not the book of record. One of my favourites was being asked to build a fully consistent inventory system. I generally like to approach things with questions, the one i had was 'sure but how do we get people who are stealing stuff to appropriately check it out?'

+Long.MAX_VALUE on this. Building warehouse management systems is very eye-opening.



--
Cheers,

Sebastian Bach

unread,
Jan 9, 2015, 10:46:02 AM1/9/15
to akka...@googlegroups.com
Maybe in 'The Reactive Manifesto' v3.0:
The human user as an auxiliary part of a reactive system should be responsive, resilient, elastic and message-driven too. He may be the weakest link in the chain.

Ganta Murali Krishna

unread,
Mar 27, 2015, 7:44:25 AM3/27/15
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Hello Roland,

Any news on this please. When we can expect implementation roughly? Your response will be really appreciated.

Regards
Murali

Roland Kuhn

unread,
Mar 27, 2015, 8:33:43 AM3/27/15
to akka-user, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, Martin Krasser
Hi Murali,

the core team at Typesafe cannot work on this right now (we need to finish Streams and HTTP first and have some other obligations as well), but Akka is an open-source project and we very much welcome contributions of all kinds. In this case we should probably start by defining more closely which queries to (initially) support and how to model them in the various backends, so that we can get a feel for how we shall change the Journal SPI.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Andre Kampert

unread,
Mar 31, 2015, 6:40:32 AM3/31/15
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com

Hi Murali,

I started development of an application based on Akka Persistence to implement CQRS concepts about a year ago. A lot of ideas came from different topics in this group. Recently I started to extract a small library from this application. The approach I took is to redundantly store all events in a global persistent actor in order to recreate views in a journal agnostic way. This also allows for easy in-memory testing.

There is a minor risk, in that storing the event twice breaks consistency. The globally stored events I only use for (re)constructing views. Once there is a better solution, the global persistent actor can be deleted and the views can be reconstructed in a different manner.

Even though this solution is not perfect, it might help your use case. I will add more tests and documentation over time as well, since currently, all tests remain in the application code.

https://github.com/Product-Foundry/akka-cqrs

Hope this helps,

Andre

Magnus Andersson

unread,
Apr 16, 2015, 4:32:39 AM4/16/15
to akka...@googlegroups.com, gregor...@gmail.com, amai...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Hi

Sorry for being late to the party. I hope you don't mind a question around the PersistentView part:

The way I read it the default interaction would be the following:
 
case class QueryByPersistenceId(id: String, fromSeqNr: Long, toSeqNr: Long)
case class EventStreamOffer(metadata: Metadata, stream: Flow[PersistentMsg])

This query would be handled by the Journal. Today we don't interact directly with the underlying Journal, instead it is done through PersistentActors and PersistentViews. 

Since PersistentView seems to be going away ("...any Actor can obtain the ActorRef for a given Journal..."), does that mean that the PersistentActor will be the one to provide an actor ref for the journal or will the journal be available separately in read mode?

/Magnus


Patrik Nordwall

unread,
Apr 17, 2015, 10:14:21 AM4/17/15
to akka...@googlegroups.com, Greg Young, amai...@gmail.com, Vaughn Vernon, Martin Krasser
We have not designed this in detail, so there is no answer to your question at the moment.
/Patrik
 

/Magnus


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Magnus Andersson

unread,
Apr 19, 2015, 3:36:56 AM4/19/15
to akka...@googlegroups.com, Greg Young, amai...@gmail.com, Vaughn Vernon, Martin Krasser
OK, then at least I know that much. :) I'll be staying tuned then for when things are picked up again. Thanks. /Magnus

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/MNDc9cVG1To/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Magnus Andersson

Olger Warnier

unread,
Apr 24, 2015, 4:42:04 AM4/24/15
to akka...@googlegroups.com, amai...@gmail.com, gregor...@gmail.com, kras...@googlemail.com, vve...@shiftmethod.com
Hi Roland / List, 

I am looking into an addition/mutation to the Persistency layer that allows storage of an aggregateId (more or less the whole 'tag' idea without being able to have multiple tags to start out with) with a replay (for the view) based on that aggregateId. (bit like the DDD AggregateRoot)

Replay is started with a message that contains a start sequence and assumes (logically) that the sequence will go up. 
With regards to the aggregateId, replay is for all persistenceIds that have registered this aggregateId. 

If you wish to allow replay on aggregate level, the sequenceId (numbering) should be on aggregate level with as side effect that the sequence numbering on persistenceId level will go up but with 'gaps'.

When you are not dependent on a gapless series of persistence events, that won't be an issue (just keep the last processed persistenceId sequence number for your snapshot, and it will still work) 

Any opinion on this ?
Somebody have a use case that requires gapless persistenceId sequence numbers ?

Kind regards, 

Olger



On Friday, March 27, 2015 at 1:33:43 PM UTC+1, rkuhn wrote:

Olger Warnier

unread,
Apr 24, 2015, 10:43:46 AM4/24/15
to akka...@googlegroups.com, kras...@googlemail.com, amai...@gmail.com, gregor...@gmail.com, vve...@shiftmethod.com
Well, 

I found that the sequence numbers are actually generated on a per persistent actor instance basis. 
So that makes replay for a single aggregateId based with limits on the sequence numbers a bit of an intresting challenge

Still interested in your opinions as that will have impact on the way to solve this (some kind of atomic sequence generator shared between aggregates ?)

Magnus Andersson

unread,
Apr 25, 2015, 7:56:07 AM4/25/15
to akka...@googlegroups.com, vve...@shiftmethod.com, amai...@gmail.com, gregor...@gmail.com, kras...@googlemail.com
Hi

From your question it looks like you want to build up a persistent view by merging journal streams using multiple persistence ids. That is a common use case and my experience is that is is a bit cumbersome, but doable today.

However you want strict replay ordering over multiple persistent actors. If you have a requirement of strict ordering across aggregate roots it sounds like a design flaw in your application, are you perhaps dividing up you domain too granularly? 

My view your persistent actors should be your aggregate roots, period. Your persistent actor can of course can have an eventual consistency dependency to other actors, for deciding logic or validating input before persisting

That being said, for views there are often the need for merging streams of events from multiple journals to build up an aggregated view. But if your persistent actors are aggregate roots then it does not make sense that the view would have any guarantee of the ordering. Events are things that happened in the past so you don't need to validate them after the fact.

Other types of ordering seems more like application specific problems. Here are some suggestions:
  1. First come first serve ordering: Setup an view aggregation actor that is fed events from multiple journal sources. Your aggregate actor is a persistent actor and will persist each messages in the sequence they arrive. You now have strict ordering in your aggregate actor and replays will guaranteed to be in the same order the events arrived. Of course this uses up extra storage and you need to keep track any implicit dependencies if you were to create multiple levels of these. 
  2. Timebased ordering: If it makes sense in your application and you trust the clock on your servers, you can relax your requirements and include a persist timestamp in your message when journaling. When you replay messages from two sources (persistent views) you can merges events into event stream buffer that sorts events based on the persist timestamp before emitting messages.
  3. Shared sequence ordering: basically your original idéa combined with event stream buffer. You include an extra field which has sequence numbers fed from your sequence source. Then replay into a journal stream buffer that makes sure events are emitted in correct order.
If you are thinking about a shared source for sequential ids, then Twitter had something called snowflake (written in Scala). The project is deprecated now but the history and code is there.

/Magnus

Greg Young

unread,
Apr 25, 2015, 7:57:35 AM4/25/15
to Magnus Andersson, akka...@googlegroups.com, vve...@shiftmethod.com, amai...@gmail.com, kras...@googlemail.com
I as a developer want assured ordering across streams because my system does 200 events per second and linearization is a simpler model.

Magnus Andersson

unread,
Apr 25, 2015, 10:10:08 AM4/25/15
to Greg Young, Martin Krasser, akka...@googlegroups.com, amai...@gmail.com, vve...@shiftmethod.com

Greg, I agree with you there. I do not disagree with convenience. :)

But where are different kinds of convenience:

As a devops person I want to minimize shared state in order to have pieces of software that can fail and start independently. I want to sleep at night and have free weekends.

So the questions I hid in my text was:
What is the motivation to solve your problem in akka-persistence instead of in your application code?

Magnus

Olger Warnier

unread,
Apr 28, 2015, 11:12:56 AM4/28/15
to akka...@googlegroups.com, vve...@shiftmethod.com, amai...@gmail.com, gregor...@gmail.com, kras...@googlemail.com
Hi Magnus, 

Thanks for your extended answer. 
Currently, I've solved it with application code and this thread and the thread before was about finding ways to support more use cases with akka persistence. The DDD Aggregate Root is one of them and that needs quite some plumbing to get it going. 
So with these discussions we've found a solution to 'tag' the stream of events and make use of that tag to replay to the views. (assuming that this is the correct interpretation of the conclusions) 

I expected some work before the end of this year as that would be my production moment (gathering real time data of hospitals and running an algorithm over that) and this enhancements would remove a piece of plumbing that I'd prefer not to see there as it makes things hard to understand. 

In my current setup, the instances of an aggregate root are created via cluster sharding with all their unique ID as a perstent actor. Thereafter I need to aggregate the changes of all these ID's of the same type of aggregate root to a single view. Now I have a persistent actor that has the persistenceId of the type and just tells that a specific instance has changed. That allows me to trigger a view for that instance.

So, Id like to be able to generate a view based on the type of aggregate root, having separate instances as persistent actors running and the 'tag' mechanism would allow me to. 

Now I am looking into a way to build this support in, in a non-intrusive way (e.g. that it will not break what is there and won't change too much of the current structure) 

I'd be concerned with the order of messages but not especially with the sequence number (expecting that the order will be OK) 
Writing the tag (aggregateId) along with the messages to persist is not much of an issue. (although the sequence numbering is per persistenceId)
Replaying is a bit more of an issue as the current view and actor are both based on the Eventsourced class that is tightly coupled to write and read based on the persistenceId. That is fine for the actor (re-construct the single instance) but not for the view (give me all persistence events for this aggregate root type (tag). 

Kind regards, 

Olger

Alejandro López

unread,
May 8, 2015, 7:07:04 AM5/8/15
to akka...@googlegroups.com, amai...@gmail.com, gregor...@gmail.com, vve...@shiftmethod.com, kras...@googlemail.com
Right now, what would be the best alternative in order to create projections, allowing views to subscribe to arbitrary events as described by Roland above? (or at least a coarse approximation)

Olger Warnier

unread,
May 8, 2015, 7:17:16 AM5/8/15
to akka...@googlegroups.com, akka...@googlegroups.com, amai...@gmail.com, gregor...@gmail.com, kras...@googlemail.com, vve...@shiftmethod.com
Hi Alejandro,

You have a number of options

- when you have a single persistence Id, write a view for that
- when you want to aggregate events of several persistence ids, you can do so via the event store (approach of Martin (with Kafka) and Greg (event store) )
 Or you can aggregate the changes to a message queue and read that
 Or you can aggregate via another persistent actor that publishes the fact that something has changed for a specific other persistence id and start a view for that id (my approach)

And there may be other ways to solve this of course.

Kind regards,

Olger



On Fri, May 8, 2015 at 1:07 PM, Alejandro López <ale6...@gmail.com> wrote:

Right now, what would be the best alternative in order to create projections, allowing views to subscribe to arbitrary events as described by Roland above? (or at least a coarse approximation)

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/MNDc9cVG1To/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Richard Rodseth

unread,
May 8, 2015, 4:58:48 PM5/8/15
to akka...@googlegroups.com
Hi Olger

Could you please elaborate a bit on your appoach? I'm not sure I follow.

Thanks. 

You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Olger Warnier

unread,
May 9, 2015, 1:51:07 PM5/9/15
to akka...@googlegroups.com
Hi Richard, 

I use the akka cluster sharding mechanism to start instances of a PersistentActor. 
This actor stands for a single instance of an AggregateRoot (DDD concept) 
This specific instance contains the logic required to run. 

For instance this can be a UserAccount instance of a user called ‘foo’ and changing the password will be handled by UserAccount and applied to this specific instance as the user ‘foo’ wants to change its password. 
Another user called ‘bar’ will have its own instance of this PersistentActor running somewhere in the cluster. 

Let’s assume that the usernames are the unique identifiers for these persistent actors. (persistenceIds)
Replaying to the state for a specific instance (for instance ‘foo’) is done by creating a view with the persistenceId ‘foo’) 

As the PersistenceActors make use of the persistenceId that is the ‘key’ for that specific instance, it is hard to tell what the persistenceId for your view needs to be. 
Somebody registers the user ‘noname’ -> a persistent actor is created via cluster sharding with the persistenceID ‘noname’. as that is all async in nature, you need a way to know that this persistent actor is created (or received a change for that matter, as actors may be put to sleep)

So, I have a singleton persistent actor that keeps track of changes with a ‘known’ persistenceId. When change happens to ‘foo’, ‘bar’ and ‘noname’, they will notify the PersistentActor with an id as ‘user-account-change-tracker’ that will write something like UserAccountChanged(‘noname’) 
A view that listens to ‘user-account-change-tracker’ will thereafter create the views for the specific user (for this matter ‘noname’) and replay / restore the state for that view as required. (mostly elasticsearch indexes that are used for the query side of my application) 

As the other approaches, this has advantages and drawbacks. 
1) the use of cluster sharding is something you may not need (but if you do, persistence and cluster sharding work well together)
2) the event store is filled with ‘change’ events in order to trigger the real views to materialise 
3) it is not dependent on a specific event store backend and therefore easier to include in your unit tests using the components available with akka-persistence 

With the other options, choices enough. Apply them as you require. 

Kind regards,

Olger



Richard Rodseth

unread,
May 10, 2015, 10:37:05 AM5/10/15
to akka...@googlegroups.com
Thanks. So I guess you're basically using a persistent actor as a durable queue, in place of something like a Kafka topic, and a single message on the queue can include ids of multiple aggregates.

Alejandro López

unread,
May 10, 2015, 6:01:14 PM5/10/15
to akka...@googlegroups.com, amai...@gmail.com, vve...@shiftmethod.com, gregor...@gmail.com, kras...@googlemail.com
@Olger Thanks for the detailed explanation of your approach. As Richard mentioned, it looks indeed like using persistent actor as durable queue.

Is there any concrete effort started already in the direction of supporting this in Akka? Or at least there's some example implementations on this idea which can be generalized towards a more complete solution?

Regards,

Alejandro.
 

Vaughn Vernon

unread,
May 10, 2015, 6:24:39 PM5/10/15