Akka Persistence - Views with multiple processors

1,149 views
Skip to first unread message

Chanan Braunstein

unread,
Apr 11, 2014, 6:51:52 AM4/11/14
to akka...@googlegroups.com
Hello,

We are trying to evaluate using Akka persistence to use going forward in our new platform. It seems to us that views could be a very powerful way to do some of the things we need to do but it can only act on one stream of events. An interesting feature, I think, would be to have a view return a list of processors that it is interested in getting events of. For example, the classic bank account example:

Account A -> Processor 
View A -> Daily Balance (Note: Daily balance is not the same as current state as events aren't applied the same way)

Account B -> Processor
View B -> Daily Balance

View A & B -> A sum of the daily balance of all accounts

True, in this simple example, View A & B can easily be done without the combined view, but of course, more complex (and powerful) ideas are out there. Also, it doesn't have to be limited to the same kind of processor.

Chanan

Patrik Nordwall

unread,
Apr 14, 2014, 7:15:58 AM4/14/14
to akka...@googlegroups.com
Hi Chanan,

Can't you have the two views A and B forward the messages to a third aggregating actor?

Regards,
Patrik


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Chanan Braunstein

unread,
Apr 14, 2014, 8:48:18 AM4/14/14
to akka...@googlegroups.com
Hi Patrik,

Sure and if that is how akka-persistence stays, that is what we will end up doing. But since persistence was marked as experimental, and it was stated that it will changed based on community feedback, I thought I would mention this feature that seems useful and powerful to me and hope others will think so as well. 

The difference between having an actor that will then query a bunch of views and an actor that encapsulates all the the logic needed to get a true view of what the business needs to show is exactly that. Think of it as a viewmodel, like you would have in an MVVM app. But, as you pointed out, just like many things in development, there are workarounds. 


Patrik Nordwall

unread,
Apr 14, 2014, 9:22:17 AM4/14/14
to akka...@googlegroups.com
On Mon, Apr 14, 2014 at 2:48 PM, Chanan Braunstein <chanan.b...@pearson.com> wrote:
Hi Patrik,

Sure and if that is how akka-persistence stays, that is what we will end up doing. But since persistence was marked as experimental, and it was stated that it will changed based on community feedback, I thought I would mention this feature that seems useful and powerful to me and hope others will think so as well. 

Thanks for the feedback. When the processor ids are known up front I don't see a major advantage of reading from several processors from one view. An interesting variation of this feature is to to have a view reading from a set of processors only known by a tag (or something), i.e. the view doesn't know the exact processor ids up front.

/Patrik
 

The difference between having an actor that will then query a bunch of views and an actor that encapsulates all the the logic needed to get a true view of what the business needs to show is exactly that. Think of it as a viewmodel, like you would have in an MVVM app. But, as you pointed out, just like many things in development, there are workarounds. 


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Chanan Braunstein

unread,
Apr 14, 2014, 9:25:17 AM4/14/14
to akka...@googlegroups.com
Hi Patrik,

That would be perfect for the needs I was thinking of. I hope that makes it in to a future version. 

Patrik Nordwall

unread,
Apr 14, 2014, 10:21:31 AM4/14/14
to akka...@googlegroups.com
On Mon, Apr 14, 2014 at 3:25 PM, Chanan Braunstein <chanan.b...@pearson.com> wrote:
Hi Patrik,

That would be perfect for the needs I was thinking of. I hope that makes it in to a future version. 

Please create a ticket (now github issues).

Chanan Braunstein

unread,
Apr 14, 2014, 10:42:08 AM4/14/14
to akka...@googlegroups.com

delasoul

unread,
Apr 15, 2014, 3:44:51 PM4/15/14
to akka...@googlegroups.com
Hello,

problems using a view for multiple processors have been discussed before, e.g.:
It would still be a great feature to have though(when you need an aggregated view on events of multiple processors).
The solution to send the events from views to an aggregator still leaves me with the question why then use a view at all and not just send the events from
the processor to the aggregator directly? When using a view in the "middle" one has to deal again with not sending duplicate messages to the aggregator when views
replay, persistence(if needed) of the aggregator, losing messages etc. or am I missing smthg?

michael







On Monday, 14 April 2014 16:42:08 UTC+2, Chanan Braunstein wrote:

Olger Warnier

unread,
Apr 18, 2014, 3:25:49 AM4/18/14
to akka...@googlegroups.com
Hi Patrick, 

In a situation with a lot of (Eventsourced)Processors and having a View to store certain (view) state into a database (or other storage), You need to take care of the way connections to these storage are managed, especially when it is used together with cluster sharding.

A single view that is able to proces the events per node (or for multiple nodes) allows for an easy construction of a CQRS style view model and allows me to use that same actor to ask for the view data. (this is how it's done with the Axon framework)  

Let's assume that I need this functionality, in what way would you change the persistence code to support this ? 

It seems that my other option is to create a distributed publish subscribe mechanism using channels in order to aggregate the data, but I prefer to construct it in a way that's inline with the way persistence works / will work. 

Kind regards, 

Olger

Martin Krasser

unread,
Apr 18, 2014, 7:54:33 AM4/18/14
to akka...@googlegroups.com
Hi Olger,


On 18.04.14 09:25, Olger Warnier wrote:
Hi Patrick, 

In a situation with a lot of (Eventsourced)Processors and having a View to store certain (view) state into a database (or other storage), You need to take care of the way connections to these storage are managed, especially when it is used together with cluster sharding.

A single view that is able to proces the events per node (or for multiple nodes) allows for an easy construction of a CQRS style view model and allows me to use that same actor to ask for the view data. (this is how it's done with the Axon framework)  

Let's assume that I need this functionality, in what way would you change the persistence code to support this ?

I'm currently working on view composition using the brand new akka-stream module. Basic idea is to make views stream producers and to use the akka-stream DSL to merge message/event streams from several producers into whatever you need. See also https://twitter.com/mrt1nz/status/457120534111981569 for a first running example.

WDYT?


It seems that my other option is to create a distributed publish subscribe mechanism using channels in order to aggregate the data, but I prefer to construct it in a way that's inline with the way persistence works / will work. 

Kind regards, 

Olger





On Tuesday, April 15, 2014 9:44:51 PM UTC+2, delasoul wrote:
Hello,

problems using a view for multiple processors have been discussed before, e.g.:
It would still be a great feature to have though(when you need an aggregated view on events of multiple processors).
The solution to send the events from views to an aggregator still leaves me with the question why then use a view at all and not just send the events from
the processor to the aggregator directly? When using a view in the "middle" one has to deal again with not sending duplicate messages to the aggregator when views
replay, persistence(if needed) of the aggregator, losing messages etc. or am I missing smthg?

michael







On Monday, 14 April 2014 16:42:08 UTC+2, Chanan Braunstein wrote:
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Olger Warnier

unread,
Apr 18, 2014, 10:10:08 AM4/18/14
to akka...@googlegroups.com, kras...@googlemail.com
Hi Martin, 


I'm currently working on view composition using the brand new akka-stream module. Basic idea is to make views stream producers and to use the akka-stream DSL to merge message/event streams from several producers into whatever you need. See also https://twitter.com/mrt1nz/status/457120534111981569 for a first running example.

WDYT?

First of all Nice stuff !, I think this is useful for the system at my hands (real-time patient monitoring based on medical data)
I've seen the streams announcements but did not dive into that yet. Looking at your code StreamExample.scala it more or less 'clicks' in concept. (and hopefully in the right way)

From a 'View' perspective as currently is available in akka-persistence, every producing actor needs a view attached to it in order to push the events to the streams producer, right ? (when I look at the ViewProducer.scala code, this is what is done.)

PersistentFlow.fromProcessor("p1").toProducer(materializer)
Now, I have a sharding cluster with an EventsourcedProcessor  (expect 10.000ths of these EventsourcedProcessor actor instances) , so I'll need to create a line like this for every EventsourcedProcessor in order to get the stream of events together. Thereafter, I need to merge them together to get a single stream of events. (at least that is one of the features of using the streams)

My goal is to have 'Listeners' (that is my interpretation of a 'View' due to historic reasons...) that will for instance update a data store, this will probably happen on on just a few nodes (maybe 1 and some failover stuff). These 'Listeners' need to attach to the sharded Eventsourced system and ask to get all event sourced events forwarded. (publish subscribe more or less). 

I wonder if the current View (or ViewProducer) fits this situation due to the fact you need to create as many views as eventsourcedprocessors are created. 
With the merged streams thereafter, it seems a possibility to have just one thing per node (I assume actor) that will do the writing to a data store (not being the eventstore). 
What would be the way to get these Views 'automagically' attached to the proper procesors ? 

And, do you have a pointer how this issue is solved with it's own eventstore ? In a sharding cluster, you more or less have the same issue. (would streams change your approach there ?) 






 

Martin Krasser

unread,
Apr 19, 2014, 1:11:23 AM4/19/14
to akka...@googlegroups.com
Hi Olger,

installing 10k views/producers won't scale, at least not with the current implementation. Here are some alternatives:

- Maybe a custom journal plugin is what you need: a plugin that delegates all write/read requests to the actual journal actor and that additionally updates a database with the events to be written. This essentially installs a single "listener" per ActorSystem (this is to some extend comparable to a database trigger that executes additonal commands. If the backend datastore supports that directly, I recommend implementing the trigger there, if possible).

- Instead of having thousands of processors, what speaks against combining them into a single processor (or only a few) per node?

Further comments inline ...


On 18.04.14 16:10, Olger Warnier wrote:
Hi Martin, 


I'm currently working on view composition using the brand new akka-stream module. Basic idea is to make views stream producers and to use the akka-stream DSL to merge message/event streams from several producers into whatever you need. See also https://twitter.com/mrt1nz/status/457120534111981569 for a first running example.

WDYT?

First of all Nice stuff !, I think this is useful for the system at my hands (real-time patient monitoring based on medical data)
I've seen the streams announcements but did not dive into that yet. Looking at your code StreamExample.scala it more or less 'clicks' in concept. (and hopefully in the right way)

From a 'View' perspective as currently is available in akka-persistence, every producing actor needs a view attached to it in order to push the events to the streams producer, right ? (when I look at the ViewProducer.scala code, this is what is done.)

PersistentFlow.fromProcessor("p1").toProducer(materializer)
Now, I have a sharding cluster with an EventsourcedProcessor  (expect 10.000ths of these EventsourcedProcessor actor instances) , so I'll need to create a line like this for every EventsourcedProcessor in order to get the stream of events together. Thereafter, I need to merge them together to get a single stream of events. (at least that is one of the features of using the streams)

Every processor instance itself could create such a producer during start and send it to another actor that merges received producers.



My goal is to have 'Listeners' (that is my interpretation of a 'View' due to historic reasons...) that will for instance update a data store, this will probably happen on on just a few nodes (maybe 1 and some failover stuff). These 'Listeners' need to attach to the sharded Eventsourced system and ask to get all event sourced events forwarded. (publish subscribe more or less). 

I wonder if the current View (or ViewProducer) fits this situation due to the fact you need to create as many views as eventsourcedprocessors are created. 
With the merged streams thereafter, it seems a possibility to have just one thing per node (I assume actor) that will do the writing to a data store (not being the eventstore). 
What would be the way to get these Views 'automagically' attached to the proper procesors ?

See above.



And, do you have a pointer how this issue is solved with it's own eventstore ? In a sharding cluster, you more or less have the same issue. (would streams change your approach there ?)

There's one journal actor per ActorSystem where n journal actors in a cluster update a replicated journal.

Hope that helps.

Cheers,
Martin

Olger Warnier

unread,
Apr 19, 2014, 2:46:47 PM4/19/14
to akka...@googlegroups.com, kras...@googlemail.com

Hi Martin, 

Had to think about it a little, hereby my follow up. (hope you don't mind the continues discussion, it helps me a lot in defining the right approach, thanks for that)

On Saturday, April 19, 2014 7:11:23 AM UTC+2, Martin Krasser wrote:
Hi Olger,

installing 10k views/producers won't scale, at least not with the current implementation. Here are some alternatives:
Intresting, what would need to change to have is scaling ?
(Idea is to have the eventsourcedprocessors reflect a DDD style Aggregate Root instance and have those distributed using cluster sharding) 
 

- Maybe a custom journal plugin is what you need: a plugin that delegates all write/read requests to the actual journal actor and that additionally updates a database with the events to be written. This essentially installs a single "listener" per ActorSystem (this is to some extend comparable to a database trigger that executes additonal commands. If the backend datastore supports that directly, I recommend implementing the trigger there, if possible).

I am not sure, if I understand it.. the basic idea is to have the 'events' stored via the eventsourcedprocessor being published to 'n' views. The actual number of view that need to listen to these events are not known up front (people can add their own views... at system startup, it will be clear) 
As every eventsourced actor is actually an AggregateRoot (in DDD terms) and thereby something of an instance with it's own state, the changes in these states need to be aggregated (that can be done with the streaming as you mention) and published to the views that are interested (subscribed). 
Doing this by hand in the aggregate root actor is not a problem, thereafter write your own listener actor and that will populate a view data store. Still I have the feeling that the actual 'View' (or ViewProducer) could be implemented in such a way that it's done by the view.
 

- Instead of having thousands of processors, what speaks against combining them into a single processor (or only a few) per node?
This would mean that I'll have all my aggregate root instances running in 1 processor meaning that I need to reconstruct state per aggregate root instance in some way. Using EventsourcedProcessor, I'd expect that I need to replay everything for all instances and pick the one that I need for processing at that moment. (this can of course be optimized with snapshots and something like memcached). This appears to be a performance hit as I feel it. 
 

Further comments inline ...

On 18.04.14 16:10, Olger Warnier wrote:
Hi Martin, 


I'm currently working on view composition using the brand new akka-stream module. Basic idea is to make views stream producers and to use the akka-stream DSL to merge message/event streams from several producers into whatever you need. See also https://twitter.com/mrt1nz/status/457120534111981569 for a first running example.

WDYT?

First of all Nice stuff !, I think this is useful for the system at my hands (real-time patient monitoring based on medical data)
I've seen the streams announcements but did not dive into that yet. Looking at your code StreamExample.scala it more or less 'clicks' in concept. (and hopefully in the right way)

From a 'View' perspective as currently is available in akka-persistence, every producing actor needs a view attached to it in order to push the events to the streams producer, right ? (when I look at the ViewProducer.scala code, this is what is done.)

PersistentFlow.fromProcessor("p1").toProducer(materializer)
Now, I have a sharding cluster with an EventsourcedProcessor  (expect 10.000ths of these EventsourcedProcessor actor instances) , so I'll need to create a line like this for every EventsourcedProcessor in order to get the stream of events together. Thereafter, I need to merge them together to get a single stream of events. (at least that is one of the features of using the streams)

Every processor instance itself could create such a producer during start and send it to another actor that merges received producers.
That would not allow me to implement 'View' (as is known in the persistence package) in order to listen to events within my cluster of aggregate root instances, I'll need to build something additional for that (as View is more used for the collection of those events and thereafter will push them through) 

At this moment, I use an akka extension (gives more or less a singleton) that is used directly in the EventSourcedProcessor after storage (persist) of the event.  
Thereafter I have listeners that get these events and transform them into data that needs storage for a certain type of view (CQRS style)  (this is where I expected the 'View' to be used) 


My goal is to have 'Listeners' (that is my interpretation of a 'View' due to historic reasons...) that will for instance update a data store, this will probably happen on on just a few nodes (maybe 1 and some failover stuff). These 'Listeners' need to attach to the sharded Eventsourced system and ask to get all event sourced events forwarded. (publish subscribe more or less). 

I wonder if the current View (or ViewProducer) fits this situation due to the fact you need to create as many views as eventsourcedprocessors are created. 
With the merged streams thereafter, it seems a possibility to have just one thing per node (I assume actor) that will do the writing to a data store (not being the eventstore). 
What would be the way to get these Views 'automagically' attached to the proper procesors ?

See above.


And, do you have a pointer how this issue is solved with it's own eventstore ? In a sharding cluster, you more or less have the same issue. (would streams change your approach there ?)

There's one journal actor per ActorSystem where n journal actors in a cluster update a replicated journal.
Thanks, thats not done as an akka extension right ? (why not ?) 

Hope that helps.
Certainly, really appreciate your patience. 

Cheers,
Martin

Kind regards, 

Olger 
 

Patrik Nordwall

unread,
Apr 20, 2014, 8:32:07 AM4/20/14
to akka...@googlegroups.com
Hi Olger,

What if you keep the sharded event sourced actors (+10k), but let them also send the events to one or a few processors. Then you can connect the views/streams to these processors.

If you don't like storing the events twice you can instead store some meta-data (processor-id, seq-no,timestamp) and have a view that creates sub-views on demand from the replayed meta-data. The sub-views would forward to the parent aggregated view.

/Patrik
--

Olger Warnier

unread,
Apr 20, 2014, 8:47:14 AM4/20/14
to akka...@googlegroups.com
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 
When you talk about view, that's the akka-persistence view ? 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

Is that a correct understanding ?

Kind regards, 

Olger

Patrik Nordwall

unread,
Apr 20, 2014, 10:59:22 AM4/20/14
to akka...@googlegroups.com
On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.

That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive.

That alternative is more complicated, and I'm not sure it is worth it.

Cheers,
Patrik

Olger Warnier

unread,
Apr 20, 2014, 12:05:57 PM4/20/14
to akka...@googlegroups.com


On Sunday, April 20, 2014 4:59:22 PM UTC+2, Patrik Nordwall wrote:

On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.
Yup, great sample.  

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.
Indeed. In what way is the AllUsersView connected to that Processor ? (in a distributed pub sub situation) (although, I have to understand in what way 'inside the persist block' is to be interpreted.  
 

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.
Is it possible todo persistent channels with the distributed pub sub stuff that's available in akka ? 

That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive.
Will that work with a sharded cluster ? (and a 'View' may be running on another node)
 

That alternative is more complicated, and I'm not sure it is worth it.
From a solution perspective, using the distributed pub sub, maybe with persistent channels is what will do. 

Most of my questions have todo with using akka-persistence as a full fledged DDD framework, not too hard without the sharding (although a view for every aggregate root instance seems not to fit when you want to use that for database connectivity that contains a view model). with the sharding it is more complicated and a good structure to actually build a view that is on 'some' node listening for events, doing' it's thing is a handy part. 

Cheers, 
Olger

Patrik Nordwall

unread,
Apr 21, 2014, 3:07:03 AM4/21/14
to akka...@googlegroups.com
On Sun, Apr 20, 2014 at 6:05 PM, Olger Warnier <ol...@spectare.nl> wrote:


On Sunday, April 20, 2014 4:59:22 PM UTC+2, Patrik Nordwall wrote:

On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.
Yup, great sample.  

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.
Indeed. In what way is the AllUsersView connected to that Processor ? (in a distributed pub sub situation)

In a persistent View you define the processorId that it will read the persistent messages from. It reads (replays) from the journal periodically, or when you send it a Update message. You can have many views connected to the same processor. The processor doesn't have to know anything about the views. In a distributed setup you will use a distributed/replicated journal and thereby the view can be located on another machine than the processor.
 
(although, I have to understand in what way 'inside the persist block' is to be interpreted.  

 
 

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.
Is it possible todo persistent channels with the distributed pub sub stuff that's available in akka ? 

Yes, PersistentChannel requires a confirmation from the destination, so if you wrap/forward the ConfirmablePersistent and send it via pub-sub it should be fine. It will not work if you publish to multiple subscribers.
 

That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive.
Will that work with a sharded cluster ? (and a 'View' may be running on another node)

yes
 
 

That alternative is more complicated, and I'm not sure it is worth it.
From a solution perspective, using the distributed pub sub, maybe with persistent channels is what will do. 

Most of my questions have todo with using akka-persistence as a full fledged DDD framework, not too hard without the sharding (although a view for every aggregate root instance seems not to fit when you want to use that for database connectivity that contains a view model). with the sharding it is more complicated and a good structure to actually build a view that is on 'some' node listening for events, doing' it's thing is a handy part.

Thanks for your thoughts. I'm sure patterns and tools around this will evolve from the experience of using akka persistence in real applications.

Cheers,
Patrik

Olger Warnier

unread,
Apr 22, 2014, 5:32:36 AM4/22/14
to akka...@googlegroups.com

On Monday, April 21, 2014 9:07:03 AM UTC+2, Patrik Nordwall wrote:
On Sun, Apr 20, 2014 at 6:05 PM, Olger Warnier <ol...@spectare.nl> wrote:
On Sunday, April 20, 2014 4:59:22 PM UTC+2, Patrik Nordwall wrote:
On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.
Yup, great sample.  

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.
Indeed. In what way is the AllUsersView connected to that Processor ? (in a distributed pub sub situation)

In a persistent View you define the processorId that it will read the persistent messages from. It reads (replays) from the journal periodically, or when you send it a Update message. You can have many views connected to the same processor. The processor doesn't have to know anything about the views. In a distributed setup you will use a distributed/replicated journal and thereby the view can be located on another machine than the processor.
 
(although, I have to understand in what way 'inside the persist block' is to be interpreted.  

Ah, I thought you were familiar with EventsourcedProcessor. Read up on it in the docs:
 
Clear, that's being followed, Understand your sentence now.  
 

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.
Is it possible todo persistent channels with the distributed pub sub stuff that's available in akka ? 

Yes, PersistentChannel requires a confirmation from the destination, so if you wrap/forward the ConfirmablePersistent and send it via pub-sub it should be fine. It will not work if you publish to multiple subscribers.
 
I need it with the publish. Is it possible to combine the DistributedPubSubMediator and the ReliableProxy in some kind of way to get guaranteed delivery (more or less) over pub sub ? (I've read the akka-dev thread on reliable messaging and the use of a MQ kind of thing for that, but I prefer to keep it clustered without a specific additional component in between)  


That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive.
Will that work with a sharded cluster ? (and a 'View' may be running on another node)

yes
 
 

That alternative is more complicated, and I'm not sure it is worth it.
From a solution perspective, using the distributed pub sub, maybe with persistent channels is what will do. 

Most of my questions have todo with using akka-persistence as a full fledged DDD framework, not too hard without the sharding (although a view for every aggregate root instance seems not to fit when you want to use that for database connectivity that contains a view model). with the sharding it is more complicated and a good structure to actually build a view that is on 'some' node listening for events, doing' it's thing is a handy part.

Thanks for your thoughts. I'm sure patterns and tools around this will evolve from the experience of using akka persistence in real applications.


Cheers,

Olger 

Patrik Nordwall

unread,
Apr 22, 2014, 5:56:43 AM4/22/14
to akka...@googlegroups.com
No, with multiple consumers you have to use something else. That is why persistent Views are an attractive way of publishing the events.
 
(I've read the akka-dev thread on reliable messaging and the use of a MQ kind of thing for that, but I prefer to keep it clustered without a specific additional component in between)  

Yes, using an external message broker that has support for this is an alternative.

Cheers,
Patrik
 


That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive.
Will that work with a sharded cluster ? (and a 'View' may be running on another node)

yes
 
 

That alternative is more complicated, and I'm not sure it is worth it.
From a solution perspective, using the distributed pub sub, maybe with persistent channels is what will do. 

Most of my questions have todo with using akka-persistence as a full fledged DDD framework, not too hard without the sharding (although a view for every aggregate root instance seems not to fit when you want to use that for database connectivity that contains a view model). with the sharding it is more complicated and a good structure to actually build a view that is on 'some' node listening for events, doing' it's thing is a handy part.

Thanks for your thoughts. I'm sure patterns and tools around this will evolve from the experience of using akka persistence in real applications.


Cheers,

Olger 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Olger Warnier

unread,
Apr 22, 2014, 3:43:46 PM4/22/14
to akka...@googlegroups.com
Based on all your remarks, I found the way to get this organised using the Views for scanning changes and thereafter starting an actor with the specific actorpath of the sharded eventsourced instance in order to process all changes (the meta-data option as you called it). As said, that solves the issues of storing data twice and keeps the whole application reasonably simple (no application level acknowledgements back and forth)

Thanks for your support, really helped me to get this nailed. (A raw version of this is working on my machine now)
Olger

Patrik Nordwall

unread,
Apr 23, 2014, 5:02:36 AM4/23/14
to akka...@googlegroups.com
On Tue, Apr 22, 2014 at 9:43 PM, Olger Warnier <ol...@spectare.nl> wrote:
Based on all your remarks, I found the way to get this organised using the Views for scanning changes and thereafter starting an actor with the specific actorpath of the sharded eventsourced instance in order to process all changes (the meta-data option as you called it). As said, that solves the issues of storing data twice and keeps the whole application reasonably simple (no application level acknowledgements back and forth)

Thanks for your support, really helped me to get this nailed. (A raw version of this is working on my machine now)

Your welcome. Thanks for the update.

Paweł Kaczor

unread,
May 9, 2014, 10:29:32 AM5/9/14
to akka...@googlegroups.com


On Sunday, April 20, 2014 4:59:22 PM UTC+2, Patrik Nordwall wrote:


The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.

That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice.


What's wrong with the following (push only) solution (much more simple): 
User (eventsourced actor) ---- reliable event delivery (Channel) ---> AllUsers (normal actor, updates projection (i.e sql table))


Pawel

Olger Warnier

unread,
May 9, 2014, 10:37:16 AM5/9/14
to akka...@googlegroups.com
Hi Pawel, 

In that way, you need to write your own mechanism to support replay towards the views. 

Kind regards, 

Olger


Paweł Kaczor

unread,
May 9, 2014, 11:58:42 AM5/9/14
to akka...@googlegroups.com
Hi Olger,
assuming update of projection is performed synchronously, confirmation will be sent back to User actor only after projection has been updated successfully. Events can be redelivered by User actor until they are confirmed (http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Message_re-delivery) So, the "reply towards the view" mechanism is already available, just needs to be configured (max replies, restarts of User actor) ?

Best regards,
Pawel

Olger Warnier

unread,
May 9, 2014, 12:08:54 PM5/9/14
to akka...@googlegroups.com
Hi Pawel, 

The flow is as follows:

Command -> User (event sourced actor) -> Persistent(Event) -> stored in the event store -> View (reads from event store)
So the moment your event is persisted in the event store, (eventually) the View will be able to process that event. (it's a pull mechanism)

So it depends a bit on your use case what you wish todo here (f.i. you can use the view to build a structure that keeps users with passwords and verify passwords based on that data, your 'sync' call will only use the view and it's data)
Just a sample as it all depends on your use case ;)

Kind regards, 

Olger

Yann Simon

unread,
Jan 22, 2015, 3:01:14 PM1/22/15
to akka...@googlegroups.com
Hi Patrick,

Le dimanche 20 avril 2014 16:59:22 UTC+2, Patrik Nordwall a écrit :



On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.

Let's say the current version in production only have User actors.
Now we want to deliver an new version that include the new Query with the AllUsers Processor.
How can we be sure that AllUsers receive all the events to be able to construct its state?
 
Thanks in advance,
Yann

Patrik Nordwall

unread,
Jan 29, 2015, 3:30:09 AM1/29/15
to akka...@googlegroups.com
On Thu, Jan 22, 2015 at 9:01 PM, Yann Simon <yann.s...@gmail.com> wrote:
Hi Patrick,

Le dimanche 20 avril 2014 16:59:22 UTC+2, Patrik Nordwall a écrit :



On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <ol...@spectare.nl> wrote:
Hi Patrick, 

Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. 

At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) 

That sounds good, as well.
 
When you talk about view, that's the akka-persistence view ? 

Yes, persistence.View and persistence.Processor
 
So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. 

I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side.

The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table.

To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.

Let's say the current version in production only have User actors.
Now we want to deliver an new version that include the new Query with the AllUsers Processor.
How can we be sure that AllUsers receive all the events to be able to construct its state?

I'm afraid there is no API to retrieve all ids, see feature request https://github.com/akka/akka/issues/13892

For the moment I guess you have to try to retrieve them from the journal data store.

Regards,
Patrik

Andrew Easter

unread,
Apr 8, 2015, 11:24:38 AM4/8/15
to akka...@googlegroups.com
Hi Patrik.

Sorry to drag this up so long after it was posted, but I have a question about it...
 
Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side. 
 
The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table. 
 
To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative.

I'm a little confused about how this solution avoids writing every event from AllUsers to the db table on every recovery of AllUsersView. In your original post, you contrasted this approach with another, in which you said:

It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db.

The implication is that the first approach (mentioned above) would _not_ need to keep track of the seq number in the db - i.e. that would only be required in the second approach. However, I can't see how this would avoid, during recovery of the AllUsersView, re-writing every event to the database unless referring to a seq number stored in the db.

Am I missing something?

Thanks,
Andrew

Andrew Easter

unread,
Apr 8, 2015, 12:34:42 PM4/8/15
to akka...@googlegroups.com
Okay, I've been reading more about PersistentView. 

I'm thinking I could achieve what I'm referring to here through the use of view snapshots?

i.e.

1) Periodically save snapshots every N hours/minutes/seconds
2) Save snapshot on shutdown of the PersistentView so that when it's recreated, it only starts consuming from last processed seq num

If the view writes to the db and, for whatever reason, crashes before persisting a snapshot that incorporates some handled seq nums, it's simply the case that updates to the db need to be idempotent, or some method of de-duping needs to be in place?

On a related note, if the last seq number is relied upon, what happens in the case a write to the db fails? Of course, the write could be retried a few times, but, if it's still not successful following a max number of retries, I guess one would have to give up on it and log the error somewhere such that the problem is at least identified?

Patrik Nordwall

unread,
Apr 14, 2015, 4:20:09 AM4/14/15
to akka...@googlegroups.com
Hi Andrew,

I think your reasoning is correct. A way to implement the de-duping (if db operations are not idempotent by themselves) is to save the sequence number in the external db together with the writes. When starting up, before replaying, you load the latest seq num from the db and then you know that you can ignore all replay events with lower seq num.

/Patrik

Andrew Easter

unread,
Apr 20, 2015, 11:43:58 AM4/20/15
to akka...@googlegroups.com
Cool - thanks for validation and tips.

Andrew
Reply all
Reply to author
Forward
0 new messages