--
Hello there,Ah, much better when able to communicate in full sentences without 140 chars limit! ;-)
So, now that it’s spelled out as full sentences, I’ll gladly dig into your points:
1)Has been already proposed and accepted in https://github.com/akka/akka/issues/15004,including your +1, so I guess you’re aware that it’s in our backlog.
The module is experimental and published “early” exactly in order to gather,and implement these features before stabilising the APIs.
So it’s coming, we simply have not yet gotten to implementing it - it’s holiday season, which isn’t helping development speed :-)
2)For the benefit of the discussion, example in EventStore: http://geteventstore.com/blog/20130707/catch-up-subscriptions-with-the-event-store/
One thing to keep in mind here is that some Journals would have no problem implementing this, such as Kafka or EventStore - because it’s a built in mechanism to “subscribe to something” in them… See Martin’s Kafka journal and how one can subscribe to a event stream there: https://github.com/krasserm/akka-persistence-kafka#journal-topics On the other hand implementing this in other Journals would be pretty painful / inefficient (cassandra, hbase, …).
We were playing around with some ideas to expose optional db specific journal functionalities. This would be a good candidate for this.
This request seems to depend on these things by the way:* changes in the journal plugins (we some changes there anyway https://github.com/krasserm/akka-persistence-kafka#journal-topics ),* views over “tags" (as this would essentially be a view over “all”),* and lastly reactive-streams (views exposed as streams of events).
Thanks for your feedback and keep in mind that no-one said that this module is “done”.It’s still experimental and this kind of feature requests are exacly what we need and will have to provide to make it stable and usable in all kinds of apps.
Lastly, would you mind creating a ticket for the 2) feature?Thanks in advance, have a nice weekend :-)--
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
Lastly, would you mind creating a ticket for the 2) feature?Thanks in advance, have a nice weekend :-)
Rephrasing my ordering question actually (it started out as something else and ended up as weirdly worded):I was thinking if the guarantees should be "time in system" or happens before as known by sequence numbers in concreten ids's (A-1 before A-2, but B-1 before B-2, but I don't care about A and B relation).
Curious about your real world use cases in other words.Less caring about ordering makes way for faster replays of course - so that's what I'm after here (perhaps thinking to far ahead though).
-- k
W dniu poniedziałek, 28 lipca 2014 22:49:00 UTC+2 użytkownik Konrad Malawski napisał:Hi everyone,thanks for your feedback and ideas.
So the stream / view on multiple persistentIds (or “tags” - would solve Greg’s example case) is coming, we just have not yet have had the time to work on it.One thing that ties in into them is reactive streams. We would like to expose these event streams as akka streams.Esp. since they provide they provide things like merge / filter / tee which I believe would help a lot in these kinds of event streams :-)
From the streams point of view abstracting if it’s polling or DB-side initiated events the APIs won’t have to change.I do agree / like Martin’s suggestion that in “normal dbs” (no events when someone does an insert) we should be able to implement this with some housekeeping done by the plugins.
One question about EventStore, in the case of reading from multiple replication groups is the ordering based simply on write-timestramp not-descending order?The timestamp is obviously skewed a bit (multiple servers/clocks do writes) but in the apps you work with would this be ok as source of ordering in case of the “all events” stream?
PS: Most of the team is on holiday this week, it’s reasonable to expect they’ll chime in some time next week.--
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
- a total order per persistenceId based on sequence numbers (= partial ordering in the "all events" stream) is a must have IMO.- ordering based on timestamps should be an application level concern (= timestamps in application-defined events and (re-)ordering done by application)
Agreed, seqNrs are at our core and we’ll stick to them (partial ordering will be in for sure, was thinking if more was needed by app implementors). Bringing in timestamps “in some case” would be inconsistent with the rest of persistence anyway, so pushing it into user land sounds good.
Actually, since we want to expose this as reactive streams the timestamp ordering could be expressed as `merge(streams, userLandProvidedOrdering)` (we don’t have this yet)… Tempting idea, looking forward to trying out different things there.
- mid/long-term goal: causal ordering (allows moving from eventual consistency to causal consistency). See also Don't Settle For Eventual Consistency.
— k
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
I think it is enough to use same "tag" for all events of a particular type of persistent actor instead of allowing different tags per event. What is important is that it is possible to track the logical position (offset) for all events with the same "tag".
So, you do not think it is possible for a journal to maintain a logical position per topic / tag ?
"Time stamps, logical clocks, or such, have to be added to your own event data."
That requirement is exactly what makes akka persistence painful, almost useless, in a DDD / ES / CQRS setup, because read side / catchup-subscriptions currently require to maintain snr per persistentId.
"The events will be in order per persistenceId (and there is a sequence number per persistenceId)."
That I am aware of and that is not enough in a DDD/ES/CQRS setup.
I see no value in tags without ability to track offset pr topic / tag.
I do not think time stamp precision is important for this, I would imagine a logical position / offset as EventStore/Kafka do. I imagine those are based on integers / longs.
I think it depends on the journal, something simple like leveldb would need help from journal, whereas something like kafka / eventstore would probably have something that one could adapt and get easier implemented.
As already mentioned in a previous post, causal ordering could be a
later extension to akka-persistence that goes beyond the limits of a
single writer or co-located storage *and* allows for better scalability.
I wish I had more time for hacking on a prototype that tracks causalities :)
I vote that you need to have a single sequence across all events in an event store. This is going to cover probably 99% of all actor persistence needs and it is going to make using akka-persistence way easier.
I am sure you have already thought of this, Patrik, but if you leave full ordering to the store implementation, it could still have unnecessary limitations if the implementor chooses to support sequence only for persistenceId.
I don't see it mentioned on this particular thread, but I feel creating reliable sagas across processors (Aggregates) is a real challenge right now as well. Having a clearly documented way to do this is critical IMO to creating a more complex and reliable CQRS-based apps.
On Fri, Aug 8, 2014 at 12:21 AM, Vaughn Vernon <vve...@shiftmethod.com> wrote:I am sure you have already thought of this, Patrik, but if you leave full ordering to the store implementation, it could still have unnecessary limitations if the implementor chooses to support sequence only for persistenceId.As a user you would have to pick a journal that supports your needs in this regard.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
There is a big overhead to store these events and also state of system in durable db's for read or other purposes. Also don't you think event sourcing apps will consume huge disk spaces and also it consume space faster wrt highly oltp application.
I have requirement in my app that changes to one aggregate root affects many more aggregate roots and all have to be in sync. I keep seeing in discussions name of sagas being refered. Will really sagas help to resolve this condition? Can I find any articles in this regard?
Are there any other design approachs?
--
None of this stuff is easy to do, and even harder to do right.
Your post gives away the main problem with getting this to work correctly, because Actor Model and akka-persistence currently supports the first half of A, but not the second half. In other words, to make the interface rich we not only need a new set of abstractions, we also need to overcome the direct messaging nature of actors because it can be limiting in some use cases.
With the messaging library I am building, currently named Number 9, which includes both persistence and a process manager, this problem is handled as follows. Any actor that sends a message may:
1. send a persistent message to another actor2. send a persistent message to a topic3. send a persistent message primarily to another actor, but also to a topic
If you have watched any of my presentations on this subject you have heard this before. I am presenting most of this to the DDD Denver meetup this Monday night. The title of the talk is "Building a Reactive Process Manager, Twice". The twice part is because I will demonstrate this working both in Scala with Akka and also in C# with Dotsero:
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Dear hakkers,unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!If I may, here’s what I have understood so far:
- In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
- Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
- Constructing Sagas is hard.
AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work). The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.
The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).
As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).
When it comes to providing queries in a way that does not have a global ordering, my current opinion is that we should not do this because it would be quite pointless (a.k.a. unusable). A compromise would be to provide eventually linearizable queries based on the premise that the application of events should be idempotent in any case and overlapping replay (i.e. where necessary from the last known-linear point instead of the requested one) must be tolerated. AFAIK this is the topic of ongoing research, though, so I’d place that lower on the priority list.
The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.
Is point one for providing a sequence number from a single ordering source?
Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work? If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer?The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.
As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).I think these are very typical kinds of queries are:- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)- All persisted events that constitute the state of my actor- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong
Hi Roland,A few more questions for clarification...On Sat, Aug 16, 2014 at 10:11 PM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
Hi Roland,Welcome back to the discussions :)I think you stated most of this very thoroughly. There are probably a few points lost in translation between your more general actor-based or CS terminology and what others would use to define ES/CQRS. Would it be possible for me to ask for just a few clarifications? I will comment inline with yours below.One other thing, however, from my points 1-3 above regarding sending messages to both single actors and to topics. In actuality the simplest way to think about this is that you have the ability to tell a vector/collection of actors something. This ability makes it easier to think about than the requirements stated in my 1-3:val processCollaborator: ActorRef = nextProcessingStep. . .val interestTopic: ActorRef = viewInterest. . .val actors: VectorRef = processCollaborator alongWith interestTopic
actors ! SomethingHappened(...)Here the vector could tell any number of actors, not just two.I have actually solved this a bit differently, but perhaps you feel that this aligns a bit better with your way of thinking. (I don't know, but it is based on our conversation from a few weeks ago.) What I have done is add another abstraction named EntityRef, which does not mean DDD entity and in no way forces thinking about DDD Aggregates. It is just an Entity in the sense that Gul Agha would probably use. With that you could also have an EntitiesRef that supports safely telling any number of interested entities about what happened. I think this is very important, because the syntax for communicating retains the same awesome explicit readability of simple ActorRef receiving a tell message:val processCollaborator: EntityRef = nextProcessingStep. . .val interestTopic: EntityRef = viewInterest. . .val entities: EntitiesRef = processCollaborator alongWith interestTopic
entities ! SomethingHappened(...)The main reason for adding EntityRef and EntitiesRef is to tag the underlying actors as reliably receiving a message. It also gives library implementors a specific extension point to do things within the EntityRef/EntitiesRef that you will not permit them to do with ActorRef. (As you and the team have made clear, ActorRef is reserved exclusively for sending with at-most-once delivery semantics.)
My inline comments are below...Vaughn
On Friday, August 15, 2014 11:39:45 AM UTC-6, rkuhn wrote:Dear hakkers,unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!If I may, here’s what I have understood so far:
- In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
- Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
- Constructing Sagas is hard.
AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work).Does that mean that a PersistentActor can emit events targeted to its persistenceId and/or targeted to an external topic and it is only the events targeted to the persistenceId that will be replayed during recovery of that PersistentActor?
Both these two types of events can be replayed by a PersistentView.
The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.Replica as in data store replica, or as in journal actor?
Is point one for providing a sequence number from a single ordering source?Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work? If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer?
The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not.
As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).I think these are very typical kinds of queries are:- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)
- All persisted events that constitute the state of my actor
- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrong
We have still not really defined what this *order* is."in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.
Or do we mean the insert order in the data store? There is often no such thing in a distributed store.Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?
I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreamsIt is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.
Is point one for providing a sequence number from a single ordering source?Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work? If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer?#1 is not about sequence numbers per se (although it has consequences of that kind): it is only about allowing persistenceIds that are not bound to a single PersistentActor and that all PersistentActors can publish to. Mock code:def apply(evt: Event) = state = evt(state)def receiveCommand = {case c: Cmd =>if (isValid(c)) {persist(Event1(c))(apply)persistToTopic("myTopic", Event2(c)) { evt =>apply(evt)sender() ! Done}}}
Everyone who listens to "myTopic" will then (eventually) get Event2.The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not.My point (perhaps not well articulated) was that in order to offer the feature of listening to arbitrary Queries the Journal MUST provide the resulting event stream in a consistent order (see more below). Providing them in random order for each replay is worse than useless as far as I can see.As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).I think these are very typical kinds of queries are:- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)This needs a total order, and it must be consistent across runs (i.e. eventual consistency is not good enough, otherwise you will lose events or double-process them).- All persisted events that constitute the state of my actorThis is defined to be the Actor’s own topic, hence it is not a Query (and therefore not a Problem ;-) ).- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
The order that they originally happened in is ill-defined unless your Journal globally serializes events—which is not a given.- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrongSame here. But it is difficult to imagine things going wrong between unrelated (concurrent & distributed) entities, their order did not matter the first time around as well, right? An exception here is causal ordering (BTW: casual ordering is not known to me).We have still not really defined what this *order* is."in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.
Or do we mean the insert order in the data store? There is often no such thing in a distributed store.Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?This is the only one we can aim for AFAICS.
As usual, I’d love to be proven wrong.I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreamsIt is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.What Greg describes there is exactly what I mean with Eventually Linearizable: while things are happening it takes a while for the system to sync up and agree on a replay order. Once that is set, everything is deterministic.
The alternative would be to strictly obey causal order by tracking who sent what in response to which message. Causality only defines a partial order, but that should by definition be enough because those event pairs which are unordered also do not care about each other (i.e. the global ordering might be different during each replay but that is irrelevant since nobody can observe it anyway).
Hi Roland (and everyone),
Welcome back Roland - I hope you had a great vacation.
Thank you for your post.
Here’s my response summary:
I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since an actor can just ignore them on replay,
(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and
(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.
I believe each of these is readily achievable with Akka but:
(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).
(iii) with (i) would enable the read side (if the store supports it) to read all events from a particular actor(s), of particular event types, to read events from a particular type(s) of actors, and to read all events. It would also need to allow the read side to read from where it last finished reading, from now, and from the start again. (iv) is necessary for projections.
If you are interested, here’s my detailed explanation:
I think some of the confusion surrounding these issues is caused by the fact that we seem to be discussing and, if I may suggest, Akka appears to be trying to implement three quite different (but also somewhat related) pieces of functionality within this domain. These are:
A. Actor Persistence
The ability to persist actor state changes incrementally (or wholly) and reconstruct that state at a later time, which we know as event sourcing. I think Akka provides a great distributed and scalable mechanism for doing this with the current akka.persistence.
B. Publish/Subscribe to Persistent Queues/Topics
This functionality would allow actors to write data/events/messages to one (or more) topics and to subscribe to receive similar from one or more topics. These differ from normal publish/subscribe queues in that they are persistent and the consumer can reread from the topic.
This is what I think of as the LogProducer and LogConsumer, of which PersistentActor and PersistentView can be thought of as specialisations, i.e. a single topic for each actor. The current and popular example of a store for this sort of functionality, as you know, is Kafka.
C. CQRS with Event Sourcing
And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores. So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.
Please consider this diagram overviewing CQRS with Event Sourcing:
<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>
adapted from
<http://www.gridshore.nl/wp-content/uploads/cqrs_architecture.jpg>
As I understand it, CQRS separates the write model and store from one or *more* read models and stores, with each model and store being optimised for their particular role. CQRS says nothing specific about the types of store (e.g. SQL or NOSQL, event sourced or not) and how consistency is achieved.
As you know, when using event sourcing the changes to the write model entities (e.g. Aggregate Roots) are stored as events and the write model is reconstructed by replaying those events. This is (A) above and what akka.persistence has achieved very well in a distributed and scalable way.
This is the dashed area labelled [1] in the diagram.
Further, CQRS uses commands to initiate changes to the write model and signals theses changes with events (whether the events are used for event sourcing or not). These events are what allows sagas and other systems to track changes and respond to changes in the write model.
This is the dashed area labelled [2] in the diagram.
For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done. The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.
This event notification system is conceptually independent of how the read store(s) will eventually become consistent (i.e. there are other means of achieving this consistency without events). However, it just so happens that these events are also a great way to notify the read store(s) of changes.
So conceptually, the read side receives all the events from the write side independent of the write side event store. I believe we should logically think of this as one big stream of events since the write side doesn’t (and cannot) know what the read store(s) needs from these events in advance.
This is the dashed area labelled [3] in the diagram.
Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea. But I don’t believe this is enough for CQRS. One event store cannot provide all the required read models.
Because the read side *must* support a number of different stores (not just the one used for actor persistence and to distribute the events to the read side) it is also necessary for there to be some reliable way to get event information into the other read store(s).
For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time. As you mentioned, this needs to be idempotent to handle at least once delivery.
I hope that Akka can really fully support CQRS with Event Sourcing to become the premiere platform for its use in highly scalable, reliable and efficient systems. I also hope this explanation can help that happen sooner rather than later since I think it is a great opportunity for Akka.
Please anyone, correct me if I am misunderstanding things or have left anything out.
Cheers,
Ashley.
On Mon, Aug 18, 2014 at 3:38 PM, Roland Kuhn <goo...@rkuhn.info> wrote:18 aug 2014 kl. 10:27 skrev Patrik Nordwall <patrik....@gmail.com>:Hi Roland,A few more questions for clarification...On Friday, August 15, 2014 11:39:45 AM UTC-6, rkuhn wrote:Dear hakkers,unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!If I may, here’s what I have understood so far:
- In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
- Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
- Constructing Sagas is hard.
AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work).Does that mean that a PersistentActor can emit events targeted to its persistenceId and/or targeted to an external topic and it is only the events targeted to the persistenceId that will be replayed during recovery of that PersistentActor?Yes.Both these two types of events can be replayed by a PersistentView.Yes; they are not different types of events, just how they get to the Journal is slightly different.The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.Replica as in data store replica, or as in journal actor?The Journal must implement this in whatever way is suitable for the back-end. A generic solution would be to shard the topics as Actors across the cluster (internal to the Journal), or the Journal could talk to the replicated back-end store such that a topic always is written to one specific node (if that helps).What has been requested is "all events for an Aggregate type", e.g. all shopping carts, and this will will not scale. It can still be useful, and with some careful design you could partition things when scalability is needed. I'm just saying that it is a big gun, that can be pointed in the wrong direction.
Is point one for providing a sequence number from a single ordering source?Yes, that is also what I was wondering. Do we need such a sequence number? A PersistentView should be able to define a replay starting point. (right now I think that is missing, it is only supported by saving snapshots)Or do you mean topic in the sense that I cover above with EntitiesRef? In other words, what is the String argument and how does it work? If you would show a few sample persist() APIs that might help clarify. And if you are referring to a global ordering sequence, whose must maintain that? Is it the store implementation or the developer?#1 is not about sequence numbers per se (although it has consequences of that kind): it is only about allowing persistenceIds that are not bound to a single PersistentActor and that all PersistentActors can publish to. Mock code:def apply(evt: Event) = state = evt(state)def receiveCommand = {case c: Cmd =>if (isValid(c)) {persist(Event1(c))(apply)persistToTopic("myTopic", Event2(c)) { evt =>apply(evt)sender() ! Done}}}Looks good, but to make it clear, there is no transaction that spans over these two persist calls.
Everyone who listens to "myTopic" will then (eventually) get Event2.The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).I think I understand this to mean that if you decide to implement a store using MySQL/Postgres/Oracle/LevelDB or whatever, then you live with what you get and what you don't get from those stores. If so, that's okay with me because we already live with those trade offs all the time anyway. I think this is far better than trying to make the whole world step up to Availability and Partition tolerance when all they want to do is write a business app using akka-persistence. This allows teams to decide for themselves which of the two CAP attributes they want, and note that even Amazon would choose C over A or P in some cases.I agree, I think the ordering quality of service should be provided by the journal implementation and not enforced by akka persistence. If you use MySQL/Postgres/Oracle/LevelDB the total ordering is a no-brainer, but if you use Cassandra or Kafka it is not.My point (perhaps not well articulated) was that in order to offer the feature of listening to arbitrary Queries the Journal MUST provide the resulting event stream in a consistent order (see more below). Providing them in random order for each replay is worse than useless as far as I can see.As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).I think these are very typical kinds of queries are:- All newly persisted events that I have not yet processed since the last time I asked for them (because I always process all new events in some specific way)This needs a total order, and it must be consistent across runs (i.e. eventual consistency is not good enough, otherwise you will lose events or double-process them).- All persisted events that constitute the state of my actorThis is defined to be the Actor’s own topic, hence it is not a Query (and therefore not a Problem ;-) ).- All persisted events from the beginning of time because I just redesigned 20 user interface views and I have to delete and rebuild all my view states from day-1, and the events must be delivered in the same order that they originally happened, or my generated views' state will be wrong
The order that they originally happened in is ill-defined unless your Journal globally serializes events—which is not a given.- All persisted events from the beginning of time because a new system is coming on line and needs to be seeded with what happened from the time I was deployed until now, and the events must be delivered in the same order that they originally happened, or the state of my newly deployed system will be wrongSame here. But it is difficult to imagine things going wrong between unrelated (concurrent & distributed) entities, their order did not matter the first time around as well, right? An exception here is causal ordering (BTW: casual ordering is not known to me).We have still not really defined what this *order* is."in the same order that they originally happened" sounds like a wall clock timestamp in the PersistentActor, is that what we mean? -- and then we all know that it is not perfect in a distributed system, and events may have exactly the same timestamp.
Or do we mean the insert order in the data store? There is often no such thing in a distributed store.Or do we mean that the replay or these events should be deterministic, i.e. always replayed in the same order?This is the only one we can aim for AFAICS.I agree, but that is impossible to achieve with a fully available system (AFAIK).
As usual, I’d love to be proven wrong.I tried to understand what is supported by EventStore. Found this page: https://github.com/EventStore/EventStore/wiki/Projections-fromStreamsIt is clear that the total order of a projection from multiple streams is not perfect, but probably good enough for practical purposes.What Greg describes there is exactly what I mean with Eventually Linearizable: while things are happening it takes a while for the system to sync up and agree on a replay order. Once that is set, everything is deterministic.Yes, the problem is that a PersistentView is querying live data, and if that "replay" order is supposed to be the same as a later historical query the data must be stored in total order, or some kind of buffering/sorting best effort must be used.
The alternative would be to strictly obey causal order by tracking who sent what in response to which message. Causality only defines a partial order, but that should by definition be enough because those event pairs which are unordered also do not care about each other (i.e. the global ordering might be different during each replay but that is irrelevant since nobody can observe it anyway).I think this ties in to what is confusing me most about all this. The only consistency boundary in my world is the DDD Aggregate instance, i.e. one PersistentActor instance. What has been requested is something that requires consistency (ordering) across Aggregate instances.Why not model the topic as a separate PersistentActor? Problems with that has been raised, such as how to reliably deliver the events from the Aggregate PersistentActor to the topic PersistentActor. Is that impossible to solve?
For CQRS specifically, a lot of what people call scalability is in it's ability to easily model multiple read views to make queries very fast off the same event data.
In the cases where a true global ordering is truly necessary, one often does not need to handle hundreds of thousands of writes per second. I think the ideal is to have the global ordering property for events by default, and have to disable that if you feel a need to do more writes per second than a single writer can handle.
Once the global ordering property is enforced, solving many of the publisher ordering issues (and supporting sagas) becomes significantly easier to achieve.
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
Hi Roland (and everyone),
Welcome back Roland - I hope you had a great vacation.
Thank you for your post.
Here’s my response summary:
I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
(ii) persist events that don’t relate to a change in state of the actor per se, which I assume is already achievable since an actor can just ignore them on replay,
(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and
(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.
I believe each of these is readily achievable with Akka but:
(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).
(iii) with (i) would enable the read side (if the store supports it) to read all events from a particular actor(s), of particular event types, to read events from a particular type(s) of actors, and to read all events. It would also need to allow the read side to read from where it last finished reading, from now, and from the start again. (iv) is necessary for projections.
If you are interested, here’s my detailed explanation:
I think some of the confusion surrounding these issues is caused by the fact that we seem to be discussing and, if I may suggest, Akka appears to be trying to implement three quite different (but also somewhat related) pieces of functionality within this domain.
These are:
A. Actor Persistence
The ability to persist actor state changes incrementally (or wholly) and reconstruct that state at a later time, which we know as event sourcing. I think Akka provides a great distributed and scalable mechanism for doing this with the current akka.persistence.
B. Publish/Subscribe to Persistent Queues/Topics
This functionality would allow actors to write data/events/messages to one (or more) topics and to subscribe to receive similar from one or more topics. These differ from normal publish/subscribe queues in that they are persistent and the consumer can reread from the topic.
This is what I think of as the LogProducer and LogConsumer, of which PersistentActor and PersistentView can be thought of as specialisations, i.e. a single topic for each actor. The current and popular example of a store for this sort of functionality, as you know, is Kafka.
C. CQRS with Event Sourcing
And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores. So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.
Please consider this diagram overviewing CQRS with Event Sourcing:
<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>
adapted from
<http://www.gridshore.nl/wp-content/uploads/cqrs_architecture.jpg>
As I understand it, CQRS separates the write model and store from one or *more* read models and stores, with each model and store being optimised for their particular role. CQRS says nothing specific about the types of store (e.g. SQL or NOSQL, event sourced or not) and how consistency is achieved.
As you know, when using event sourcing the changes to the write model entities (e.g. Aggregate Roots) are stored as events and the write model is reconstructed by replaying those events. This is (A) above and what akka.persistence has achieved very well in a distributed and scalable way.
This is the dashed area labelled [1] in the diagram.
Further, CQRS uses commands to initiate changes to the write model and signals theses changes with events (whether the events are used for event sourcing or not). These events are what allows sagas and other systems to track changes and respond to changes in the write model.
This is the dashed area labelled [2] in the diagram.
For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done. The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.
This event notification system is conceptually independent of how the read store(s) will eventually become consistent (i.e. there are other means of achieving this consistency without events). However, it just so happens that these events are also a great way to notify the read store(s) of changes.
So conceptually, the read side receives all the events from the write side independent of the write side event store. I believe we should logically think of this as one big stream of events since the write side doesn’t (and cannot) know what the read store(s) needs from these events in advance.
This is the dashed area labelled [3] in the diagram.
Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea. But I don’t believe this is enough for CQRS. One event store cannot provide all the required read models.
Because the read side *must* support a number of different stores (not just the one used for actor persistence and to distribute the events to the read side) it is also necessary for there to be some reliable way to get event information into the other read store(s).
For example, there are very good reasons for some read sides having the read model stored in an SQL database, or a graph database, or a NOSQL database, or some or all of these at the same time. As you mentioned, this needs to be idempotent to handle at least once delivery.
I hope that Akka can really fully support CQRS with Event Sourcing to become the premiere platform for its use in highly scalable, reliable and efficient systems. I also hope this explanation can help that happen sooner rather than later since I think it is a great opportunity for Akka.
everything you list here is available today via akka.persistence + event store adapter & a durable subscription (akka event store client) on the read model side.
For CQRS specifically, a lot of what people call scalability is in it's ability to easily model multiple read views to make queries very fast off the same event data.
In the cases where a true global ordering is truly necessary, one often does not need to handle hundreds of thousands of writes per second. I think the ideal is to have the global ordering property for events by default, and have to disable that if you feel a need to do more writes per second than a single writer can handle.
So how does one handle combining events from different streams- a global sequence number is the most straightforward.
Also, not everything needs to scale on the write side to that degree.
...
I am not responding to this one post just a reply towards the end and will discuss a few posts from earlier.
To start I have to agree with some of the posters that premature scaling can cause many issues. This actually reminds me of the CQRS journey which people mentioned earlier. One of the main criticisms of the CQRS Journey is that it prematurely took scaling constraints which causes the code to be much much more complex than it needs to be. This was partially due to it being a sample app of something larger and partially due to the p&p team also showing azure at the same time. Because they wanted to distribute and show Azure at the same time the team took cloud constraints as a given. This caused for instance every handler in the system to need to be idempotent. While seemingly a small constraint this actually adds a significant amount of complexity to the system.
The same problem exists in what is being discussed today. For 95+% of systems it is totally reasonable that when I write a projection I expect my events to have assured ordering. As Vaughn mentioned a few hundred events/second is the vast majority of systems. Systems like these can be completely linearized and ordering assurances are not an issue. This removes a LOT of complexity in projections code as you don't have to handle hundreds to thousands of edge cases in your read models where you get events out of order. Saying that ordering assurances are not needed and everyone should use casual consistency is really saying "we don't care about the bottom 95% of users".
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:This is a convoluted way of saying that Events must be self-contained, right? In that case: check!I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), and
This is the unclear point: who defines the query and when? What are the consistency guarantees for the generated event stream?
(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.
This is PersistentView, so “check!” (As argued previously “reliably” translates to “persistent”.)
I believe each of these is readily achievable with Akka but:
(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).
No, again we need to strictly keep Topics and Queries separate, they are very different features. Topics are defined up-front and explicitly written to, Queries are constructed later based on the existing event log contents. Marking events within the store with timestamps of some kind might help achieving a pseudo-deterministic behavior, but it is by no means a guarantee. Causal ordering is out of scope, and it also does not help in achieving the desired ability to replay Queries from some given point in the past.
C. CQRS with Event Sourcing
And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores. So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.
Please consider this diagram overviewing CQRS with Event Sourcing:
<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>
For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done. The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.
Why would the Sage subscribe to events instead of talking with the domain entities directly?
My understanding is that the Saga sits logically on the top left side of the first diagram, acting on behalf of and just like the user (and like the user it should have a memory and be persistent).
This is the dashed area labelled [3] in the diagram.
Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea. But I don’t believe this is enough for CQRS. One event store cannot provide all the required read models.
This is probably where all (our) misunderstanding originates: PersistentView is a very particular thing and it turns out that it does not actually match up with the Q in CQRS. Perhaps we should indeed just remove it and add a facility which lets Actors query the Journal instead (in case you want to roll your own read model adapter).
...
please forgive the typo.
It still adds a ton of complexity that is unnecessary for the vast number of systems.
We support it in event store but most don't use it.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
I am not responding to this one post just a reply towards the end and will discuss a few posts from earlier.To start I have to agree with some of the posters that premature scaling can cause many issues. This actually reminds me of the CQRS journey which people mentioned earlier. One of the main criticisms of the CQRS Journey is that it prematurely took scaling constraints which causes the code to be much much more complex than it needs to be. This was partially due to it being a sample app of something larger and partially due to the p&p team also showing azure at the same time. Because they wanted to distribute and show Azure at the same time the team took cloud constraints as a given. This caused for instance every handler in the system to need to be idempotent. While seemingly a small constraint this actually adds a significant amount of complexity to the system.The same problem exists in what is being discussed today. For 95+% of systems it is totally reasonable that when I write a projection I expect my events to have assured ordering. As Vaughn mentioned a few hundred events/second is the vast majority of systems. Systems like these can be completely linearized and ordering assurances are not an issue. This removes a LOT of complexity in projections code as you don't have to handle hundreds to thousands of edge cases in your read models where you get events out of order. Saying that ordering assurances are not needed and everyone should use casual consistency is really saying "we don't care about the bottom 95% of users".
RKuhn had mentioned doing joins. You are correct in this is how we do it now. We offer historically perfect joins but in live there is no way to do a live perfect join via queries. We do however support another mechanism for this that will assure that your live join will always match your historical. We allow you to precalculate and save the results of the join. This produces a stream full of stream links which can then be replayed as many times (perfectly) as you want.There was some discussion above about using precalculated topics to handle projections. I believe the terminology was called tags. The general idea if I can repeat it is to write an event FooOccurred and to include upon it some tags (foo, bar, baz) which would map it to topics that could then be replayed as a whole. This on the outset seems like a good idea but will not work well in production. The place where it will run into a problem is that I cannot know when writing the events all mappings that any future projections may wish to have. Tomorrow my business could ask me for a report that looks at a completely new way of partitioning the events and I will be unable to do it.
As I mentioned previously in a quick comment. What is being asked for today is actually already supported with akka,persistence providing you are using event store as your backend (for those interested today is the release of the final RC of 3.0 which has all of the support for the akka,perisistence client (binaries are for win/linux/max)). Basically what you would do is run akka.persistence on your write side but *not* use it for supporting your read models. Instead when dealing with your read models you would use a catchupsubscription for what you are interested in. I do not see anything inherently wrong with this way of doing things and it begs the question of whether this is actually a more appropriate way to deal with eventsourced systems using akka,.persistence. eg use native storage directly if it supports it.
On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:This is a convoluted way of saying that Events must be self-contained, right? In that case: check!I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
No, I don't think so. As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc. I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side. As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.
(iii) read from (and replay) streams of events on the read and write side according to a range of criteria supported and defined within the store or via the store API (e.g. using a DSL), andThis is the unclear point: who defines the query and when? What are the consistency guarantees for the generated event stream?I suggest the developers of the read side specify the queries directly to the event store but this may be after the events have initially been persisted. The event store produces the query stream (if it can) and a PersistentView can be setup to read from that named query. With regards to consistency guarantees - my understanding is that these streams are used to eventually guarantee that the query model will be consistent with the write model, i.e. all the events will get across. With regards to ordering I think the event store does the best it can to provide consistent ordering, e.g. total ordering if there was no distribution and causal ordering, where possible, if there was ordering. The developer would need to understand the limitations of how the query store is configured and queried.
(iv) reliably (at least once) deliver information to other read side store(s) and systems above and beyond the store used for persisting the events.
This is PersistentView, so “check!” (As argued previously “reliably” translates to “persistent”.)As I asked in another thread (I think) I am not sure how PersistentView can do this when PersistentActor is the one that can mixin AtLeastOnceDelivery?I think we need a PeristentView that can guarantee AtLeastOnceDelivery to an actor representing a query store. This would seem to require a PersistentViewActor ;-) that can read from a persistent query and also persist its state to provide guaranteed delivery.My lack of knowledge of Scala and Akka may be showing here.
I believe each of these is readily achievable with Akka but:
(i) doesn’t mean explicitly persisting the events to specific topics as you suggest in your (1) (although this may be how some stores implement the required functionality on the read side). Instead it means transparently including information like the actorId, event type, actor type, probably the time and possibly information to help with causal ordering (see my next post).
No, again we need to strictly keep Topics and Queries separate, they are very different features. Topics are defined up-front and explicitly written to, Queries are constructed later based on the existing event log contents. Marking events within the store with timestamps of some kind might help achieving a pseudo-deterministic behavior, but it is by no means a guarantee. Causal ordering is out of scope, and it also does not help in achieving the desired ability to replay Queries from some given point in the past.I think we do agree somewhere in there but I don't think as was suggested (by you earlier?) that creating topics up-front whether a fixed set or arbitrary tags will work. I feel in what way the store supports the queries (and how much it can) is up to the store (e.g. creating separate topics or synthetic topics), so I would argue against using topics for CQRS. As I mention below for Pub/Sub to Persistent topics it would be great, but not for CQRS.
C. CQRS with Event Sourcing
And finally, there is CQRS with Event Sourcing, which I believe is much more that (A) and (B) and particularly doesn’t necessarily require (B.) for all event stores. So if Akka were to implement (B), which I think would be very useful for other reasons, it would not specifically be for CQRS.
Please consider this diagram overviewing CQRS with Event Sourcing:
<https://www.dropbox.com/s/z2iu0xi4ki42sl7/annotated_cqrs_architecture.jpg>
For example, a saga could be waiting for an event indicating funds had been withdrawn from a bank account after it had issued a command requesting that be done. The saga could subscribe to events from the bank account before issuing the command and watch for a specific event from that time on.
Why would the Sage subscribe to events instead of talking with the domain entities directly?I believe it is for loose coupling and to make the process asynchronous (and event-driven).
When an AR processes a command it cannot know all the other ARs or sagas wanting to know when the command has been performed.
Also in an actor-based system to be reliable(?) I would assume sagas need to read events from a persistent store (or receive messages with at-least-once-delivery guarantee).
Please check out the short section on Sagas from the CQRS Journey book:Particularly Figure 2.I believe the black filled arrows are events and the white hollow arrows are commands. Commands I believe are well suited to actor messages but events I believe are best suited to persistent publish/subscribe-like communication (i.e. the journal). To be honest though, I note that the text talks about event messages, although it also talks about the event messages using an event bus, so I am a little unsure.My understanding is that the Saga sits logically on the top left side of the first diagram, acting on behalf of and just like the user (and like the user it should have a memory and be persistent).My understanding is that sagas sit where the Services are in the diagram I provided (near [2]), i.e. they issue commands (not labelled as such in the diagram unfortunately but what else could they be?) and listen for events on the write side (as labelled).Someone please correct me if I am wrong.
This is the dashed area labelled [3] in the diagram.
Akka seems to distribute the event store used for persistence of ARs [1] on the write side to the read side, which is an interesting idea. But I don’t believe this is enough for CQRS. One event store cannot provide all the required read models.
This is probably where all (our) misunderstanding originates: PersistentView is a very particular thing and it turns out that it does not actually match up with the Q in CQRS. Perhaps we should indeed just remove it and add a facility which lets Actors query the Journal instead (in case you want to roll your own read model adapter).PersistentViews tied to one PersistentActor we know are too limited. However, I think PersistentViews tied to named query streams (specified to the journal) would be useful (if they could guaranteed at least once delivery to an actor representing a query store).
Thank you for your patience reading through all of this text.
Cheers,Ashley.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:
On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:
18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:
This is a convoluted way of saying that Events must be self-contained, right? In that case: check!I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
No, I don't think so. As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc. I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side. As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.
The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.
On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.
On 20.08.14 10:16, Roland Kuhn wrote:
19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:
On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:
18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:
This is a convoluted way of saying that Events must be self-contained, right? In that case: check!I believe Akka needs to allow actors to:
(i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
No, I don't think so. As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc. I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side. As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.
The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.
On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.
akka.dispatch.Envelope could carry additional information, so that user-collaboration is not needed
Am 20.08.2014 13:33 schrieb "Roland Kuhn" <goo...@rkuhn.info>:
>
>
> 20 aug 2014 kl. 10:43 skrev Martin Krasser <kras...@googlemail.com>:
>
>> On 20.08.14 10:16, Roland Kuhn wrote:
>>>
>>>
>>> 19 aug 2014 kl. 18:59 skrev Ashley Aitken <amai...@gmail.com>:
>>>
>>>> On Tuesday, 19 August 2014 21:14:17 UTC+8, rkuhn wrote:
>>>>>
>>>>>
>>>>> 18 aug 2014 kl. 18:01 skrev Ashley Aitken <amai...@gmail.com>:
>>>>>
>>>>>> I believe Akka needs to allow actors to:
>>>>>>
>>>>>>
>>>>>> (i) persist events with as much information as efficiently possible on the write side to allow the store to facilitate the read side extracting them according to what criteria is needed,
>>>>>
>>>>> This is a convoluted way of saying that Events must be self-contained, right? In that case: check!
>>>>
>>>>
>>>> No, I don't think so. As I understand it now, the only thing the event store knows about each event is the persistenceId and a chunk of opaque data. It doesn't know the type of the event, the type of the message, any time information, any causal dependency etc. I guess what I am saying is that the events need to include as much metadata as possible so that the event store can provide the necessary synthetic streams if they are requested by the read side. As I mentioned later, some event stores (like Kafka may replicate the events into separate topics based on this information), others (like Event Store) may use this information later to form streams of links to the original events.
>>>
>>>
>>> The event store has the full event available, which is all the information there is: gathering or duplicating arbitrary parts of the information is likely not going to help, because you will discover later that you missed something initially, and if the mechanism is baked into the Akka Persistence Journal SPI then fixing it will take a very long time (until plugins are migrated and your OPS guys allow you to use it etc.). My recommendation is to use a serialization mechanism that fits the Journal, allowing it to understand the events and provide semantic features on top. Both (Journal and serialization) are configured in the same file, so I submit that coupling them is a valid approach.
>>>
>>> On causal consistency: I am still unconvinced that it is worth pursuing, and I am certain that you are vastly underestimating the amount of data and effort involved. And it cannot be done without collaboration from the user since a single inter-Actor message outside of the traced system (i.e. not using a PersistenceEnvelope of sorts) would hamper or destroy it.
>>
>>
>> akka.dispatch.Envelope could carry additional information, so that user-collaboration is not needed
>
>
> Well, actually Envelope is going to go away in Akka Gålbma (a.k.a. Akka 3).
My point was that causal consistency can be achieved without forcing users to do extra work, as tracing dependencies can be done completely within Akka, and there are several possible ways to implement that.
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Please stop using the terminology of "saga" and replace usage with "process manager" what people (largely influenced by nservicebus call a saga is actually a process manager and a saga is a different pattern). Its bad enough the .net community does this the last thing we need is for the akka community to start doing the same :)
Clarifying the terminology
The term saga is commonly used in discussions of CQRS to refer to a piece of code that coordinates and routes messages between bounded contexts and aggregates. However, for the purposes of this guidance we prefer to use the term process manager to refer to this type of code artifact. There are two reasons for this:
There is a well-known, pre-existing definition of the term saga that has a different meaning from the one generally understood in relation to CQRS. The term process manager is a better description of the role performed by this type of code artifact.
Although the term saga is often used in the context of the CQRS pattern, it has a pre-existing definition. We have chosen to use the term process manager in this guidance to avoid confusion with this pre-existing definition.
The term saga, in relation to distributed systems, was originally defined in the paper "Sagas" by Hector Garcia-Molina and Kenneth Salem. This paper proposes a mechanism that it calls a saga as an alternative to using a distributed transaction for managing a long-running business process. The paper recognizes that business processes are often comprised of multiple steps, each of which involves a transaction, and that overall consistency can be achieved by grouping these individual transactions into a distributed transaction. However, in long-running business processes, using distributed transactions can impact on the performance and concurrency of the system because of the locks that must be held for the duration of the distributed transaction.
...
Whilst we are talking about s... process managers I would like to include this simple way of understanding them I found on the web: "Process Managers produce commands and consume events, whereas Aggregate Roots consume commands and produce events." The truth is a bit more complicated I believe in that Process Managers can also consume commands (e.g. to stop the process).Further, whilst I would like to accept Roland's view that both commands and events can be communicated by sending messages (since, as he suggests, it would make things a lot simpler and lighter on the write side), I am concerned that there are use-cases for process managers that involve them listening for events from ARs they have not sent a command message to. Can anyone confirm/deny?
Thanks,Ashley.
further explanation http://soa.dzone.com/news/are-sagas-and-workflows-same-t
...--
>>>>>>>>>> Read the docs: <a href="http://akka.io/docs/" target="_blank" onmousedown="this.href='http://www.google.com/url?q\75http%3A%2F%2Fakka.io%2Fdocs%2F\46sa\75D\46sntz\0751\46usg\75AFQjCNH9Zv9wfVBmOMIvC4NJ3Ivyq
...
Dear hakkers,unfortunately it took me a long time to catch up with akka-user to this point after the vacation, but on the other hand this made for a very interesting and stimulating read, thanks for this thread!If I may, here’s what I have understood so far:
- In order to support not only actor persistence but also full CQRS we need to adjust our terminology: events are published to topics, where each persistenceId is one such topic but others are also allowed.
- Common use-cases of building projections or denormalized views require the ability to query the union of a possibly large number of topics in such a fashion that no events are lost. This union can be viewed as a synthetic or logical topic, but issues arise in that true topics provide total ordering while these synthetic ones have difficulties doing so.
- Constructing Sagas is hard.
AFAICS 3. is not related to the other two, the mentions in this thread have only alluded to the problems so I assume that the difficulty is primarily to design a process that has the right eventual consistency properties (i.e. rollbacks, retries, …). This is an interesting topic but let’s concentrate on the original question first.
The first point is a rather simple one, we just need to expose the necessary API for writing to a given topic instead of the local Actor’s persistenceId; I’d opt for adding variants of the persist() methods that take an additional String argument. Using the resulting event log is then done as for the others (i.e. Views and potentially queries should just work). The only concern is that the Journal needs to be prepared to receive events concurrently from multiple sources instead of just the same Actor, but since each topic needs to be totally ordered this will not be an additional hassle beyond just routing to the same replica, just like for persistenceIds.
The second point is the contentious one, since a feature request (consistent iteration over a query) clashes with a design choice (scalability). First it is important to note that this clash is genuine: scalability means that we do not want to limit the size of a topic to always fit one unit of consistency, our default assumption is that everything should be prepared for distribution. We all know that in a distributed system linearizability is not generally achievable, meaning that a distributed (synthetic) topic that receives events from concurrent sources will not be able to provide a global ordering. A non-distributed Journal, OTOH, is a single point of failure which is not desirable for many applications (i.e. your business will go down while the Journal has issues—true replication requires the ability to fail independently and hence is distributed in the CAP sense).
As I see it, a query (like “all events of this type” etc.) should be configured for the given Journal and should then be available as a (synthetic) topic for normal consumption—but not for being written to. The Journal is then free to implement this in any way it sees fit, but barring fundamental advances in CS or errors on my part this will always require that the synthetic topic is not scalable in the way we usually define that (i.e. distributable). As Vaughn points out this may not be an issue at all, actual benchmarks would help settle this point. Journal backends that already implement a global order can make use of that, for others the synthetic topic would work just like any other non-PersistentActor topic with manual duplication of those events that match the query (akin to (a) in the first post of this thread); this duplication does not necessarily need to double the memory consumption, it could also only persist the events by reference (depending on the storage engine).
When it comes to providing queries in a way that does not have a global ordering, my current opinion is that we should not do this because it would be quite pointless (a.k.a. unusable). A compromise would be to provide eventually linearizable queries based on the premise that the application of events should be idempotent in any case and overlapping replay (i.e. where necessary from the last known-linear point instead of the requested one) must be tolerated. AFAIK this is the topic of ongoing research, though, so I’d place that lower on the priority list.
Does this sound like a fair summary? Please let me know in case I misrepresent or misunderstand something, once we reach consensus on what we need we’ll ticket and solve it, as usual ;-)Regards,Roland
12 aug 2014 kl. 18:10 skrev Ashley Aitken <amai...@gmail.com>:
Thanks for your post Vaughn.
On Monday, 11 August 2014 05:57:05 UTC+8, Vaughn Vernon wrote:None of this stuff is easy to do, and even harder to do right.I am the first to agree with that.Your post gives away the main problem with getting this to work correctly, because Actor Model and akka-persistence currently supports the first half of A, but not the second half. In other words, to make the interface rich we not only need a new set of abstractions, we also need to overcome the direct messaging nature of actors because it can be limiting in some use cases.With the messaging library I am building, currently named Number 9, which includes both persistence and a process manager, this problem is handled as follows. Any actor that sends a message may:1. send a persistent message to another actor2. send a persistent message to a topic3. send a persistent message primarily to another actor, but also to a topicThat is very interesting.It seems to me that CQRS commands should be sent as messages (persistent or not) - your (1.) and changes of state (AR or application) should be published as events (to topics or more generally) - your (2.) but I can't see a need for (3.)?Further, a process manager for a bank account transfer could be implemented with a command to the source account (withdrawForTransfer) that would be acknowledged by a published persistent event (WithdrawnForTransfer). Similar for deposit into target account.Pawel Kaczor in his DDD-Leaven-Akka series (Lesson 3) includes projections from aggregated streams of events and a process manager / saga using Akka Persistence by having the ARs persisting their events and also publishing their events.The only shortcomings (not his fault or a criticism) seem to be: 1) the use of two event infrastructures (one for persistence and one for pub/sub), 2) the limited ability for complex projections (like Greg mentioned and available in Event Store), and 3) lack of persistence for pub/sub events.The latter makes reconstruction of a read model or construction of a new read model after the events have been published more difficult.If you have watched any of my presentations on this subject you have heard this before. I am presenting most of this to the DDD Denver meetup this Monday night. The title of the talk is "Building a Reactive Process Manager, Twice". The twice part is because I will demonstrate this working both in Scala with Akka and also in C# with Dotsero:Thank you I will look out for that (please share the video link if it is recorded and put on the Web). I have seen (but not watched) some of your videos because I am unsure as to who is leading here and the videos I saw seemed to be from a few years ago.I've just got your book so I will get on with reading that (for DDD and CQRS enlightenment).
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SL5vEVW7aTo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
I'd like to mention that the current construct of the persistent view is very helpful in decoupling. I can create new listeners (e.g. PersistentView) based on the events persisted in the store.
Tags were proposed and I would be using those to actually attach the aggregate root type to the event so my persistent views would use that to 'read' the events and create the model. That model is used for the queries in the application (or for other purposes)
At this moment, I persist all changes on a an aggregate root (persistent actor for a specific instance) by writing a changed event with a specific persistenceId in the payload (of the aggregate root) to a generic persistenceId and use that to create a persistent view that uses the specific aggregate root Id as persistenceId (discussed in another group topic).
When tags are not the way forward, there is a need for some kind of query/resolver mechanism that allows to define what the persistentview is listening to. In my use case, the persistent actor is a cluster sharded actor and the persistenceId is actually the unique identifier of an instance of an aggregate root. My view would want to listen to the all events persisted by this type of aggregate root. (So that meta data needs to be available too)
Kind regards,
Olger
Btw: repost, sloooow internet in the mountains.
My initial impulse would be to postulate an isomorphism between Actor and AR and then say that the Actor Model works just fine with only direct message sends, so the same must hold for ARs and their commands/events.
In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge.
On Thursday, 21 August 2014 16:01:26 UTC+8, rkuhn wrote:My initial impulse would be to postulate an isomorphism between Actor and AR and then say that the Actor Model works just fine with only direct message sends, so the same must hold for ARs and their commands/events.Unfortunately, I am coming to believe this may not be correct. Akka message passing and related functionality is fantastic for processing within an aggregate, and Akka Persistence is great for aggregate persistence. However, it seems that most modern CQRS/ES implementations use and require a reliable EventBus and possibly a reliable CommandBus for the *write* side (with the EventBus and projections for the read side).I suggest Vaughn's creation of Number 9, AFAIK an implementation of reliable messaging to multiple actors (e.g. for events), is a symptom of this lack of a reliable EventBus but not the full solution that a EventBus (and CommandBus) would provide. I believe a full CQRS/ES implementation may require reliable decoupled command delivery and reliable publish-subscribe for events.
In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge.No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event. An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process. Again this seems to require a reliable publish-subscribe facility for events.
Now I am really impressed with the functionality provided by Akka, including its routers and distributed pub-bub functionality. And, as we have seen how Akka Persistence and its distributed journal / store can enable AtLeastOnceDelivery for messages. I thus suggest that the distributed store could more generally enable other services to move up to a more reliable level of local and distributed functionality (IF NEEDED).Currently: Akka Persistence provides PersistentActosr and AtLeastOnceDeliveryFuture?: Akka Persistence provides PersisentActor, AtLeastOnceDelivery, ReliableRouter, ReliableDistributedPubSub, and Reliable...?Of course, if reliable routing and publish-subscribe can be implemented in another way in Akka that would be fine as well. Or perhaps the successful "let it crash" approach extends to messaging, i.e. "let it be lost." If so I am keen to find out how to make a CQRS/ES implementation work effectively if/when commands are lost and if/when events are not delivered to those needing to be notified of them.
The require reliable functionality can obviously be added on top of Akka by using other services. However, I think use of the distributed store that comes with Akka Persistence to implement these could remove the need for an extra infrastructure component. And, if this is done with a store that may be central to enterprise infrastructure, e.g. Kafka, then that would also make integration easier.I see something like this was discussed early in 2013: https://groups.google.com/d/topic/akka-user/cmDna0_Mo58/discussion
Cheers,Ashley.--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Kind regards,
Olger
Btw: repost, sloooow internet in the mountains.
-----Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge.No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event. An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process. Again this seems to require a reliable publish-subscribe facility for events.Yes, this is a good point: I was thinking of ephemeral (per-request) process managers, for which I still believe that the message flows should be organized such that the PM does not need to subscribe to events—that would impose too much overhead.
Your example of a “standing” process is exactly one of those read-side consumers that we have in mind for the proposed PersistentView replacement.
There is a fundamental difference between commands and events: if a command is lost then the contained intent evaporates and nothing happens, the sender will eventually recognize this and retry or abort the transaction.
On Wednesday, 27 August 2014 23:08:27 UTC+8, rkuhn wrote:In theory if Client sends a command to A which in order to fulfill it will need to send a command to B then A’s response to the Client will contain B’s reply in some capacity. Whether there are cases where this structure becomes impractical is something I cannot tell with my current knowledge.No, I believe there are use-cases where a process manager listens for an event without having first sent a command to the aggregate root producing the event. An example could be a process manager listening for any ItemPicked events and starting a process to check the stock levels and possibly reorder, separate from the sales process. Again this seems to require a reliable publish-subscribe facility for events.Yes, this is a good point: I was thinking of ephemeral (per-request) process managers, for which I still believe that the message flows should be organized such that the PM does not need to subscribe to events—that would impose too much overhead.I understand the concerns about overhead on the read-side but wonder how other CQRS/ES implementations handle this. I asked a question related to this on Stack Overflow and received an interesting answer (explained fully in the comments):
Your example of a “standing” process is exactly one of those read-side consumers that we have in mind for the proposed PersistentView replacement.I apologise if I am misunderstanding what you mean by "read-side" but this PM issues commands so I had assumed it must be on the write-side. If you mean using PersistentView(v2) makes it read-side (I assume one could use PersistentViews on the write-side) then that's ok.
Please consider also this diagram, similar to the last one I shared:IMO: Write Side = [1] + [2], Read Side = [3], but I see the similarity of [2] to [3] in a round-about way if the client is included.There is a fundamental difference between commands and events: if a command is lost then the contained intent evaporates and nothing happens, the sender will eventually recognize this and retry or abort the transaction.Yes, I had thought this as well based on what I had read but again from the comments to the Quora answer: "If your commandbus is not reliable, you can use a timeout mechanism (i.e., check for expected result of command after some time), but this requires reliable timeouts."
Does Akka have a reliable timeout mechanism (which I assume it means work across crashes)?
Thanks,Ashley.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
...