How to implement Process Manager / Saga without ES and without Framework?

900 views
Skip to first unread message

Joerg

unread,
Feb 10, 2021, 9:33:25 AM2/10/21
to DDD/CQRS
Hi there,

I am taking my first steps in the world of distributed computing and I am creating a small in-house ASP.NET 5.0 web application for scheduling scientific simulations in particle physics and producing plots and reports from simulation results. The system consists of three components:

1. A simulation service doing all the configuration, scheduling, orchestration and data persistance. It also provides the API for the user Front-End-SPA.
2. Worker nodes that poll the simulation service for simulation batch jobs, then run said simulations and send the results back.
3. A plotting service that turns the simulation results into colourful images.

Since this is just a small internal system, I want to keep the complexity low. Hence, I would like to use DDD and CQRS for clarity, but on a small beginner's scale.
In particular, for the simulation service, I was thinking of using the same database for both the read side and the write side, where the write model mainly consists of database views and a thin DTO access layer. Also, on the write side I would not use event sourcing, but rely on a more traditional CRUD-approach with tables. I also try to avoid a third-party message bus / message broker, again for simplicity. All components would talk via direct but often asynchronous HTTP-Requests with each other.

Now my question is:

What is the easiest and best way to implement some sort of persistant state machine / process manager / saga in my simulation service without using any kind of complex third-party message broker or infrastructure and without having to adopt event sourcing?

In particular, how would I solve the following questions with standard ASP.NET 5.0 tools and without big third-party frameworks:

1. As far as I understood the concept, process managers transform events into commands using internal state. How would I do that without event sourcing? Are these "process manager events" conceptually different from the "event sourcing events"?

2. Should I persist these "process manager events", which would end up being some sort of event sourcing again? Or can I just use a `SimulationProcess` and a `PlottingProcess` database-table for the book-keeping of the overall process state?

3. How can the process manager decide about its state transitions when I don't use a message broker? Since the process manager only calls command handlers which have return type void, I suppose I have to inject some sort of callback function to the command handler function, that informs the process manager of the domain logic outcome, right?

4. How do I implement these process managers? How do they correspond to System.Threading.Task instances? Do I have one Task per process manager instance, i.e. one Task per individual root aggregate id?

5. I guess I would then have to hold a list of all those process manager instances in memory. Is that even possible when the database grows and I have thousands of records in my database?

6. How does the "process manager loop" work? Who's scheduling/triggering the process manager cycles? Sometimes there can be both external triggers such as a `StartPlotting` command sent by a user, and internal triggers such as timer events or file system polling events. Should I use ASP.NET `IHostedService` instances for this spinning loop?

7. How can a process manager schedule commands that should be executed in the future? Something like "Execute `CheckBatchJobTimeout` command in 10 minutes"?


I am a bit lost here. I keep reading the recommendation to avoid event sourcing as long as possible for small systems, but I cannot find any implementation of persistent state machines / saga /process manager in a more traditional CQ(R)S setting.

I would be extremely grateful for any code snippets, blog articles or other resources.

Greetings,
Joerg

Boris Guéry

unread,
Feb 10, 2021, 2:46:11 PM2/10/21
to ddd...@googlegroups.com
On Wed, Feb 10, 2021 at 3:33 PM Joerg <england...@gmail.com> wrote:

1. As far as I understood the concept, process managers transform events into commands using internal state. How would I do that without event sourcing? Are these "process manager events" conceptually different from the "event sourcing events"?

Nothing wrong to have Event Sourced Process Manager, IMHO it may even be a good way to reuse and be consistent with the whole system.
Event Sourcing is nothing more than a persistent mechanism in which data are computed based on state changes.
We already made this kind of implementation and it worked well in our cases.


2. Should I persist these "process manager events", which would end up being some sort of event sourcing again? Or can I just use a `SimulationProcess` and a `PlottingProcess` database-table for the book-keeping of the overall process state?

You _must_ not persist events (except if you event source your process managers), but you need to persist the state.


3. How can the process manager decide about its state transitions when I don't use a message broker? Since the process manager only calls command handlers which have return type void, I suppose I have to inject some sort of callback function to the command handler function, that informs the process manager of the domain logic outcome, right?

You don't need all these fancy stuff (message broker), your process manager "handles" event, update its state and manage transitions based on it.
 

4. How do I implement these process managers? How do they correspond to System.Threading.Task instances? Do I have one Task per process manager instance, i.e. one Task per individual root aggregate id?

5. I guess I would then have to hold a list of all those process manager instances in memory. Is that even possible when the database grows and I have thousands of records in my database?

YAGNI. You'll have time before to figure out performance outcomes. 

6. How does the "process manager loop" work? Who's scheduling/triggering the process manager cycles? Sometimes there can be both external triggers such as a `StartPlotting` command sent by a user, and internal triggers such as timer events or file system polling events. Should I use ASP.NET `IHostedService` instances for this spinning loop?

7. How can a process manager schedule commands that should be executed in the future? Something like "Execute `CheckBatchJobTimeout` command in 10 minutes"?

What about ScheduleBatchJobTimeoutCheck command with a date in the future to match against when you want to trigger your actual check?

 

I am a bit lost here. I keep reading the recommendation to avoid event sourcing as long as possible for small systems, but I cannot find any implementation of persistent state machines / saga /process manager in a more traditional CQ(R)S setting.

I would be extremely grateful for any code snippets, blog articles or other resources.

Greetings,
Joerg

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/dddcqrs/a09e1c79-6cdc-4821-ad56-12cea48c0b10n%40googlegroups.com.

--

Boris Guéry 

Rickard Öberg

unread,
Feb 11, 2021, 2:54:35 AM2/11/21
to ddd...@googlegroups.com
Hi,

We do something similar to this, so I'll just describe what we do, and you can see if you can apply it to your own situation.

On the command side aggregates convert commands to a list of events. If event sourcing is turned on you would persist this in an event store, and then have subscribers read them and create a read model projection. However, you could shortcut this in your application and simply send the events created by aggregates straight to the projection (in our case a conversion from event objects to database update statements). Once that is committed we send the events to any interested listeners who may want to react to them, typically by updating external systems, but that could also simply be creating new commands to be put into the front of the command model. You could think of these listeners as process managers or sagas, depending on how complex you want them to be. In our case they just send updates to external systems, but they could just as well load state from the database and decide based on that, like a state machine. 

That's about as simple as it gets, without having to use any infrastructural tools to do it. This only relies on the command model creating events rather than state updates, which then are used in two ways: read model projection AND process manager.

/Rickard

Joerg

unread,
Feb 13, 2021, 1:45:03 PM2/13/21
to DDD/CQRS
Thank you Boris for your answers.

You write "What about ScheduleBatchJobTimeoutCheck command with a date in the future to match against when you want to trigger your actual check?".
My question is, what this command would be doing? While it can write some scheduling meta-data to the database, who's gonna invoke/execute the corresponding CheckBatchJobTimeout command later on? Is that a separate Thread/Task that is periodically checking the database for scheduling meta-data? Or how would I do that?

And would I just have one Thread/Task for that or multiple? Which brings me back to my question about a list of process manager instances in memory. Could you elaborate on the YAGNI, please? I am not talking about snapshotting or caching of aggregates here, but about the "scheduling intelligence" that runs behind the scenes and actively turns the gears.

Joerg

unread,
Feb 13, 2021, 1:51:01 PM2/13/21
to DDD/CQRS
Thank you Rickard for your answer, too. Talking about the "event shortcut" you mention. I assume, I would still have to persist the events, too, so that they are part of the same transaction / unit of work as the domain model state update, right?

And how do you deal with scheduling future commands through your event listeners? Do you have any kind of periodic Thread/Task that turns the gears? And how is that intelligence related to your event listeners?

Francisco Javier Estrella Rodriguez

unread,
Feb 13, 2021, 2:46:52 PM2/13/21
to DDD/CQRS
Hello,

You have many ways to do what you are asking for, for me if you don't want to complicate it you can use the following frameworks that make your task easier:

1.- Hangfire (Job manager)
2.- Azure Functions Durable (Saga) you will need a Storage Account to store the state.
3.- MediatR (CQRS)

Rickard Öberg

unread,
Feb 13, 2021, 6:13:55 PM2/13/21
to ddd...@googlegroups.com
Hi,

Nope, in that case I don’t persist the events at all. They are only created by the aggregates and consumed immediately by the projections and process manager.

For future events, I have designed our model to not need them. The only cases we have is “publish this content in the future”, and so what we do is to just set a publish date property on those entities that are in the future, and then it is up to the read model to honor that. If you absolutely need future events, then yes you would need a way to persist that, inside of the process manager.

Sent from my iPad

On 14 Feb 2021, at 02:51, Joerg <england...@gmail.com> wrote:

Thank you Rickard for your answer, too. Talking about the "event shortcut" you mention. I assume, I would still have to persist the events, too, so that they are part of the same transaction / unit of work as the domain model state update, right?

Harrison Brown

unread,
Feb 14, 2021, 3:41:25 AM2/14/21
to DDD/CQRS
To answer your questions ones by one:

  1. No, process manager events aren't really different from any other events. In a large mature system you *might* want to separate domain events (things the business really cares about) from technical events (things that your system does which wouldn't be in your core domain model, things like moving files around or whatever). If you did separate, I'd suggest having a process manager entirely in the domain model and have it focus only on business concerns or entirely out and only do technical things. It's fine to use multiple process managers in pursuit of a full end-to-end business process. One thing to note is that you can use domain events without event sourcing. We do this in a number of apps. 
  2. First, a process manager emits commands, not events, so what you're actually asking is whether you should persist domain events in general. So, should you persist events: well, it's up to you. You can use the process manager pattern without persisting events, but you might have less resilience in your system because if you have a long chain of event → command → event → etc... then you could have a very long running single request. In general, with anything responding to events (process managers, projections, notifications, etc) I prefer to do things async — persist the event, then read back off the event persistence later — but you have to have good infrastructure/tools to do that so did a small app you might not want to go that far. 
  3. I don't really understand this question. We might have different definitions of how to approach process managers, but the way I think about them is this: a process managers responds to an event and issues a command in response, then its work is done. It doesn't do anything else until it sees another event. So in that way, the 'state' of the process manager is actually the state of the events in the system. I normally have process managers that don't need to maintain any state of their own at all and can just respond to events with commands (so they're functional, in that sense).
  4. I can't answer this as I work in PHP!
  5. No, you don't *have* to store any state about the process manager or the processes it's looking after unless you need to. Re. holding process managers in memory, if avoid it. Make your process managers stateless functional things which are invoked when there's an event to respond to; they emit their command(s) in response and then are finished. They don't actually need state unless they need to know something beyond the event at hand. Normally, the state you need can be encapsulated in how you design a sequence of events that happen in serial—the 'state' is just the event that's happening at the moment which indicates where in the process you are. Think about then like they're people: if a bank manager got an email saying "Customer went into arrears" they would know what to do (IssueFineCommand) without needing to have been aware of that customer for their whole life. 
  6. For scheduling I use passage of time events which are. I make sure they're events that my infrastructure creates and sends into the core of my code so my domain model (which includes process managers) is decoupled from the infrastructure. See https://verraes.net/2019/05/patterns-for-decoupling-distsys-passage-of-time-event/
  7. For this, I would suggest you store some state in the process manger itself so the process manager is responsible for keeping track of how long it's been since something happened and has access to state about 'in progress' processes. I'd just put this state in a standard database table. By the way, I'd handle this probably by having something else monitor the state of in progress processes and, triggered by passage of time events, dispatch a BatchJobRunningLongerThanTenMinutesWasDetected event which my process manager responds to by killing and cleaning up the business process. Even better would be something like RunningWhateverProceesDetected(int $runningForMinutes) because then you have flexibility to change your business rules about how long something can run for before having to be killed. 

In general, you're right to avoid event sourcing if you're new to all this. You can successfully do reprocess managers without event sourcing — have them be stateless and just respond to events for the most part, and have them store a bit of state in a database table in the cases where you need that (such as the BatchJobTimeout example above). And also, I agree you don't need message brokers and other stuff if you're working in a small, non-distributed system; that just adds unhelpful complexity. 

Joerg

unread,
Feb 22, 2021, 1:28:29 PM2/22/21
to DDD/CQRS
Thank you Francisco, Rickard and Harrison for all your answers. They help a lot. I have a few follow-up questions. Rickard and Harrison, you both write that events don't have to be persisted and Harrison, you also write that the process manager is a stateless, functional piece of code that just translates events into commands. Considering this, then...

1.) Where is the transaction / unit of work committed in your code? Does that happen inside the command handler routine by calling "commit" explicitely? Or does the "surrounding" process manager do that after the command handler routine has been executed and control returned to the process manager?

2.) What do you do, when the app crashes after the commit, but before publishing the new events? I guess the only solution for that is persisting the events in some sort of transactional outbox as described in https://microservices.io/patterns/data/transactional-outbox.html, right? Or how do you solve this in your applications?

3.) When I use such a transactional outbox I am back to some of the disadvantages/complexities of event sourcing such as, e.g, versioning. Unless I can make sure that between version updates the outbox is drained completely. Any idea / suggestion how to handle versioning in a transactional outbox easily?

4.) What would a transactional outbox table look like? Would it just have a "payload" column plus some meta-data columns, with the payload column containing the serialized events that are about to sent out? So in a way such a table would look like an event sourcing table as described here: https://cqrs.wordpress.com/documents/building-event-storage/

Greg Young

unread,
Feb 22, 2021, 1:36:54 PM2/22/21
to ddd...@googlegroups.com
Just as a note Harrison,

"First, a process manager emits commands, not events, so what you're actually asking is whether you should persist domain events in general."

There are cases where they can emit both. A mix is extremely common (think raising an event about reaching a point in a process or something which might be problematic). 

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.


--
Studying for the Turing test

Harrison Brown

unread,
Feb 22, 2021, 1:41:29 PM2/22/21
to DDD/CQRS
Interesting — thanks, Greg. I hadn't come across a use case like that in our work yet, but it makes sense 👍 

To clarify, might an example of what you mean be, say, a process manager which, upon receiving a "BusinessDayHasPassed" event checked whether an in-progress loan application was now too old /stale to proceed raised a "StaleApplicationWasDetected" event?

Harrison

Greg Young

unread,
Feb 22, 2021, 1:46:37 PM2/22/21
to ddd...@googlegroups.com
This is one example another might be that I was unable to proceed with something ... "BadSituationEntered"

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.

Harrison Brown

unread,
Feb 22, 2021, 1:59:53 PM2/22/21
to DDD/CQRS
Joerg, my responses to your questions below:

1.) Where is the transaction / unit of work committed in your code? Does that happen inside the command handler routine by calling "commit" explicitely? Or does the "surrounding" process manager do that after the command handler routine has been executed and control returned to the process manager?

As I think I mentioned, we put process managers into our core domain model (the innermost part, if you're using the hexagonal architecture layers of infrastructure/application/domain) because we consider the decision of which command(s) should be triggered in response to domain events to be a key business concern. So, just as aggregates in a domain model aren't concerned with persistence in any way, our process managers also don't worry about how the events are given to them nor how the commands are handled — they just return the correct commands and let the calling code handle that. This makes them very testable.

Thus, we view transaction handling as mostly an infrastructural thing. In apps where we need 'full' transactional consistency, etc. my preferred approach is to persist domain events that come from the domain model, then have an asynchronous background process (one for each process manager, view model projection, notification sender, etc. that's registered in my app — they're all basically the same thing as far as the infrastructure is concerned) which reads the event stream one by one and passes those events to the process manager. The process manager returns a command (as a plain PHP object, in our case) which that same infrastructure then executes the command. That command likely triggers changes in the domain model which themselves raise events. The infrastructure code is then responsible for atomically committing to the database both (a) the fact that that particular event has been processed, and (b) the new events raised as a result of the command. If anything goes wrong the whole thing is rolled back so the stream processor will re-attempt to process that event from scratch (or mark itself as failed and an alert sent to our team to investigate).

That's just one approach; there are probably other patterns but this works nicely for us. The key this about it is that it requires those stream processors to store their position in the event stream in the same datastore (we use MySQL) as the events themselves, otherwise you can't atomically persist the processing of an event to a process manager and the events resulting from the command(s) it returns. 

If you wanted to be even more granular with your units of work you could treat commands as async messages and store them too (so you'd only have to atomically commit the stream position and the command(s) the process manager emits; the commands would then go onto their own stream to be processed by another stream processor. And thus, we realise events and commands are only conceptually different if we want to refer to them (which I do find helpful) but really they're both just "messages").

So, short answer: We handle the transaction in the surrounding code that calls the process manager, not the process manager itself.
 
2.) What do you do, when the app crashes after the commit, but before publishing the new events? I guess the only solution for that is persisting the events in some sort of transactional outbox as described in https://microservices.io/patterns/data/transactional-outbox.html, right? Or how do you solve this in your applications?

Yes,  you do need a transactional outbox. The approach I've described above is that pattern but simplified by having everything in one database. Obviously, that'll hit scaling issues eventually, and constrains how you can deploy autonomous services, so eventually you need to use the transactional outbox pattern more fully and use the approach I described only within a service and then separate push those events from the outbox onto a central queue (e.g., Kafka). However, most people jump to complex patterns long before they need to, in my opinion. The approach I've described above is simple and is handling all of our use cases nicely.

For some parts of some apps, we take the view that — from a business perspective — the engineer isn't worth the risk reduction that fully transactional modelling gives so we don't always do this.

3.) When I use such a transactional outbox I am back to some of the disadvantages/complexities of event sourcing such as, e.g, versioning. Unless I can make sure that between version updates the outbox is drained completely. Any idea / suggestion how to handle versioning in a transactional outbox easily?

Yes, you are, but I don't think you can avoid that really. If you're going to persist events at all — whether as a full event stream as we do, or by doing event sourcing, or just as a transactional outbox — then you have to think about this, but I think the tradeoff is worth it. Event-based models, I find, remove so much complexity from the model itself that a bit of extra engineering is worth it. Business complexity knows no end, but there are good strategies to deal with technical complexity. A smart chap wrote a book on versioning in an event-sourced system, for example.

And yes, if you keep going along this route you will eventually realise, as I have, that you've reinvented event storming and should just get on a do it. For my team, though, it was helpful to slowly work towards it step-by-step by first using synchronous events to decouple parts of our code (and to create projections so we can do CQRS), then by persisting events for auditing purposes, then by asynchronously working through event streams, then by relinquishing our aggregate persistence entirely and just doing event storming. 

4.) What would a transactional outbox table look like? Would it just have a "payload" column plus some meta-data columns, with the payload column containing the serialized events that are about to sent out? So in a way such a table would look like an event sourcing table as described here: https://cqrs.wordpress.com/documents/building-event-storage/

Yep, if you're using a relational database then the table can be as simple as an "ID" column, an "EventType" column (which maps the event to a class in your code) and a "Payload" column which is just the event data serialised as JSON. We have a few more columns in our tables that store domain events to hold onto various metadata (see the "Envelopes" section of this post) but that's not necessary.

Ben Kloosterman

unread,
Mar 29, 2021, 7:21:44 PM3/29/21
to ddd...@googlegroups.com
"The key this about it is that it requires those stream processors to store their position in the event stream in the same datastore (we use MySQL) as the events themselves, otherwise you can't atomically persist the processing of an event to a process manager and the events resulting from the command(s) it returns. "

I avoid events in relational DBs these days unless its small far more issues, especially the low insert speed.  EventStore  and Redis streams are my preferred options.


"Yes, you are, but I don't think you can avoid that really. If you're going to persist events at all — whether as a full event stream as we do, or by doing event sourcing, or just as a transactional outbox — then you have to think about this, but I think the tradeoff is worth it. Event-based models, I find, remove so much complexity from the model itself that a bit of extra engineering is worth it. Business complexity knows no end, but there are good strategies to deal with technical complexity. A smart chap wrote a book on versioning in an event-sourced system, for example."

Versioning in not a big deal in practice  and the problem is similar to APIs with older clients which people dont seem to have an issue with.  You can over complicate it with events the same as you can for API's 


Transactions are a bigger discussion. Pretty much everything has atomic writes; the issues are around reading that back and may consistent read systems having very poor scaling .


radhakris...@gmail.com

unread,
Apr 20, 2021, 12:48:51 AM4/20/21
to DDD/CQRS
As far as the 'outbox' pattern is concerned there are architectural considerations. When we use a cloud provider and queues and events it seems to be hard
to decide the approach. Do we use the WAL-tail logging in PostgreSql or other DBs ? This will not fit well with queues and events. There are some facilities
to stream such WAL-tail logs to a cloud stream for auditing but that is a different need.

Thanks.

Reply all
Reply to author
Forward
0 new messages