Implementing CQRS with Task Queue(Push Queue)

105 views
Skip to first unread message

Naresh Pokuri

unread,
Sep 16, 2015, 3:50:14 AM9/16/15
to Google App Engine
I am implementing CQRS model on GAE. So, as per the design whatever commands we publish on aggregate will generate one or more Events and publish. Those events has to be handled asynchronously. At the same time the event handling Sagas can generate new commands required to update other Aggregates.

For handling Events asynchronously I am planning to use GAE Task Queue(Push Queue). But in Task Queue documentation it is stated that the a task can run one or more times and should be idempotent. But I cannot make Events/Commands result as idempotent. Same command might execute and will update read-models in inconsistent way.

Is there any other service to process Events/Commands asynchronously from GAE? or any design suggestions for making Event/Commands idempotent?

Wolfram Gürlich

unread,
Sep 16, 2015, 6:31:34 AM9/16/15
to Google App Engine
You could first save your event/command as an entity and delete that entity at the same time you apply your event/command in a cross-group transaction.

Wolfram

Naresh Pokuri

unread,
Sep 16, 2015, 12:21:52 PM9/16/15
to Google App Engine
Sorry, I didn't get you. How this solution will stop running same command/event multiple times?

Wolfram Gürlich

unread,
Sep 16, 2015, 12:59:46 PM9/16/15
to Google App Engine
Simply write the "event" entity at the same time (or immediately before) you enqueue the processing task. Inside the processing task first read the entity and only do the processing if the entity is still there. Since that  entity is deleted if and only if  your model is updated successfully the event is guaranteed not to be processed more than once no matter how often the task executes.
There is an additional advantage to this pattern, as you would see event entities dangling as long as they are not processed succesfully. This way you can be 100% sure that your events were really processed.

Wolfram 

Naresh Pokuri

unread,
Sep 16, 2015, 1:35:00 PM9/16/15
to Google App Engine
But here I am using Datastore(sorry for not mentioning earlier) and it is eventual consistent. So, reading inserted event may not be found (or) reading deleted event may be still found

Wolfram Gürlich

unread,
Sep 16, 2015, 1:42:51 PM9/16/15
to Google App Engine
Only reading from an index is eventual consistent. Getting an entity by its id (name) is strongly consistent (if not explicitly configured otherwise).

Wolfram

Naresh Pokuri

unread,
Sep 16, 2015, 1:59:18 PM9/16/15
to Google App Engine
Okay. One final question if you worked with Push Task Queues - Is there a chance that duplicate task runs at the same time the actual task running or duplicate tasks won't run until actual task completes?

Thanks for your help

Wolfram Gürlich

unread,
Sep 16, 2015, 2:11:39 PM9/16/15
to Google App Engine
I doubt this ever happens but I'm not really sure. To handle that case you should start the transaction before reading the event entity. That way the transaction will abort should the event entity have been deleted concurrently by another task.

Wolfram
Reply all
Reply to author
Forward
0 new messages