NCQRS Messaging

91 views
Skip to first unread message

Adam Greene

unread,
Aug 12, 2014, 3:27:09 PM8/12/14
to ncqr...@googlegroups.com
Good day everyone.

  I am looking at using NCQRS in a project, and I am curious about the Messaging system that allows communication between aggregates.  I was wondering about the proper way to implement messaging.  I am looking at the NCQRS.Messaging.Tests project as a starting point.  I can understand most of the stuff in there.  The biggest problem I am having is with:

public void Handle(RegisterHandlingEventMesasge message)
            {
                ApplyEvent(new HandlingEventRegistered
                              {
                                  CargoId = message.CargoId
                              });

                To().Aggregate<Cargo>(_cargoId)
                    .Ensuring(MessageProcessingRequirements.RequiresExisting)
                    .Send(new CargoWasHandledMessage());
            }

in Ncqrs.Messaging.Tests\ScenarioTest.cs.  Specifically, the To() method, I understand the intent, but the all that seems to happen is a MessageSentEvent is fired.  The only thing that seems to be handling the event is the FakeSendingStrategy, which simply seems to queue the message and then ScenarioTest pops the message and manually processes the event.  In a real world application, what is the right way of implementing this inter-aggregate communication?

Galen

unread,
Sep 10, 2014, 9:38:40 PM9/10/14
to ncqr...@googlegroups.com
Have you had any more progress here? I've been trying to catch you online to have a chat about this but the time differences for us suck.

What I'm trying to do is :
  1. User sends command to AR1

  2. In the command handling side of AR1 create a reference IMessageService from NcqrsEnvironment then call messagingService.Process and process a new message

  3. In the AR2 I have a  IMessageHandler<msg> that picks up the message and then sends another message to AR3
     To().Aggregate<AR3>(ar3Id)
                    .Ensuring(MessageProcessingRequirements.RequiresNew)
                    .Send(message2);
Things pass in my unit tests fine... however when the integration tests come in and I'm actually using a UoW command executor I'm getting this exception. : "There is already a unit of work created for this context."

Presumably since the initial command is being executed within a UoW and then the messageService tries to create it's own Unit of Work for each message it's processing and hence the error.

I suspect the solution involves using the NSB Message Strategy or something where the messages are just 'put on the bus' and not executed in the same UoW context as the initial command.

However I'm just struggling to get this working and wanted to know if you've had any progress here that might help.

Cheers
Galen

Adam Greene

unread,
Sep 10, 2014, 10:50:22 PM9/10/14
to ncqr...@googlegroups.com
The example I put in my repo at GitHub uses a Task to switch to a new thread inside the listener (as the UoW is per thread).  See https://github.com/adamcogx/ncqrs/blob/master/Extensions/src/Ncqrs.Messaging.Tests/FakeSendingStrategy.cs and https://github.com/adamcogx/ncqrs/blob/master/Extensions/src/Ncqrs.Messaging.Tests/ScenarioTest.cs

--
You received this message because you are subscribed to the Google Groups "ncqrs-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ncqrs-dev+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Adam Greene, Lead Software Architect
ad...@cognitivex.com / http://www.cognitivex.com / 1.800.670.COGX (2649)

Please consider the impact on the environment before printing this email

Galen

unread,
Sep 11, 2014, 1:34:35 AM9/11/14
to ncqr...@googlegroups.com
Hi Adam

Yeah that's what I've been using actually.

Ok below is from my AR1 as it handles the incoming user command.. im trying to send IssueReferralMessage to AR2 (ReferredOpportunity)

 
        public void IssueReferral(Guid referredOpportunityId, Guid sourceId, string sourceName, Guid managerId, string managerName, string reference, string caption, string location, string description)
        {
            var referral = new IssueReferralMessage
            {
                MessageId = Guid.NewGuid(),
                SourceOpportunityId = EventSourceId,
                SourceId = sourceId,
                ManagerId = managerId,
                Reference = reference,
                Caption = caption,
                Location = location,
                Description = description
            };
            
            // THIS WORKS in unit testing -- but does not work when executed within a command handler
            //var messagingService = NcqrsEnvironment.Get<IMessageService>();
            //messagingService.Process(referral);

            // THIS DOESNT WORK -- i feel this should work but the message processor doesnt fire
            To().Aggregate<ReferredOpportunity>(referredOpportunityId) 
                .Send(referral);

        }
       
And my ncqrs configuration is set up to use the following messaging service;

 private static IMessageService InitializeMessageService()
        {
            var messagingService = new MessageService();

            //messagingService.UseReceivingStrategy(IssueReferralMessageStrategy.ReceivingStrategy());
            messagingService.UseReceivingStrategy(new ConditionalReceivingStrategy(x => true, new LocalReceivingStrategy()));

            var messageSendingEventHandler = new MessageSendingEventHandler();
            var sendingStrategy = new FakeSendingStrategy(messagingService);
            messageSendingEventHandler.UseStrategy(new ConditionalSendingStrategy(x => true, sendingStrategy));
            ((InProcessEventBus)NcqrsEnvironment.Get<IEventBus>()).RegisterHandler(messageSendingEventHandler);

            return messagingService;
        }

Any thoughts?

Adam Greene

unread,
Sep 11, 2014, 7:48:56 AM9/11/14
to ncqr...@googlegroups.com
Hi Galen,

Can you get me the stack trace for the exception?  I want to trace where it is happening.

Galen

unread,
Sep 11, 2014, 9:52:15 PM9/11/14
to ncqr...@googlegroups.com
Hi Adam

Just figured I'd continue our discussion on here....  for those reading Adam and I have been trying to get our heads around the NCQRS.Messaging extension and how to implement it into our respective systems. We've done a bit of communication in IM so apologies if this discussion seems fragmented.


FakeSendingStrategy - processing messages in a new thread .... look I keep seeing friction in this approach.... 

The Good
After stepping through my message processing and event handling I can confirm my events are correctly being stored in the event store.
All my events use primitive type properties and have had no issue with JSON Serialization of event data... they seem complete and find.

The Unsure
In addition to my regular Aggregate events there are also Messaging.MessageSentEvent and Messaging.MessageReceivedEvent...  I am not sure how I feel about these 'meta' events being in my EventStore. As I mentioned to you last night I'm pretty sure I only desire inter-aggregate messaging and the ncqrs.messaging extension to be used as an alternative to commands. The result of a user initiated command or an aggregate initiate message would usually be one or more aggregate events. Once they are raised there is no difference to processing. Polluting the eventStore with these messages smells to me.

The Bad
Ok I know we've fired off the 'processing' of the sent messages in it's own thread. In doing this I've visibility of errors and exceptions. Obviously my 'debugging or NSB host process which acts my application service' has no plumbing to monitor the new thread. So I've already lost a lot of time trying to figure out what happened to a thread that terminates due to an exception.

The Ugly
More of a symptom of the message processing currently incurring in a new thread the aggregate events are not being published to NSB... probably something to do with the message handler registrations again.


For sure it seems to me the preferable approach involves putting the message onto NSB. Where it's handled in its own transaction. I've tried playing around with this but have encountered some problems.... since my 'ApplicationService' is using the Ncqrs.NServiceBus extension and process to host the NCQRS commandService referencing the Ncqrs.Messaging.NServiceBus in that same projects causes conflicts. I do not think these projects were designed to be used in the same process.

I'm guessing the original authors intending to host the commnadService in one NSB process and the messagingService in another NSB process.....I can go down this route if necessary but I would like to avoid it as it seems unessary and just have a single 'Application Service' running NSB to handle both CommandMessages and inter-aggregate Messages is preferable.

I know you haven't done NSB stuff yet but I'm just sharing my thoughts on the matter.

Galen

Adam Greene

unread,
Sep 12, 2014, 8:38:29 AM9/12/14
to ncqr...@googlegroups.com
For those reading this conversation.  I took the NCQRS.Messaging.Tests and updated it to auto-process messages (the original required a manual step after running to process the messages).  All I did was change FakeSendingStrategy to use Task.StartNew to fire the messages off to the MessageService:

public class FakeSendingStrategy : ISendingStrategy
    {
        private IMessageService messageService;

        public FakeSendingStrategy(IMessageService messageService)
        {
            this.messageService = messageService;
        }

        public void Send(OutgoingMessage message)
        {
            var task = Task.Factory.StartNew(() => messageService.Process(message));
            task.Wait();
        }
    }

Galen, I made a slight change to this, I added the task.Wait() so that failures in message processing could be registered on the calling thread as well.  For those who are wondering why we use threading at all in the this example is around the UnitOfWork, a UoW is thread static, and only one can exist at a time, and the way that the MessagingService (and CommandService) are written, they fail if you are already in an UoW, so if we skip to a new thread (or put it on an ESB) then we are fine.

Galen, The issue I have with JSON serialization is around the MessageSent and MessageReceived events, they can't accurately deserialize the contained message during load of the AR, but in looking at how the events are handled during a reload of the aggregate root, it may not matter (unless you try to call GetRelatedMessage and then it would probably return null).  The reason that the MessageSent and MessageReceived events is manyfold:

  • There is no direct connection between the Aggregate Root and the MessagingService, so there needs to be some way to communicate with it, so messaging is the simplest way.  I guess we could instead pull it from the NcqrsEnvironment within the MessagingAggregateRoot class and avoid actually putting a message on the bus.  but that leads us to the next problem.
  • If you involve some kind of ESB (like NServiceBus) there is potential for a system outside of your control to repeatedly send messages to your aggregate root (due to some kind of failure on the ESB) and by recording the message sent / received, we can keep multiple events from being recorded on the AR (Look at Ncqrs.Messaging.MessagingContext and its use in ProcessMessage in the MessagingAggregateRoot class)
Reply all
Reply to author
Forward
0 new messages