Event Store: Idempotency problmem

176 views
Skip to first unread message

Ricardo Enes da Silva

unread,
Nov 19, 2015, 1:19:56 AM11/19/15
to DDD/CQRS
Given 1 Handler subscribed to 2 streams and publishing the same event twice to both streams

I get the following
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. But would expect 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. Curiously If a change publishing order I get 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'

So it appear s idempotency is only working on the first stream created

This is my code
 class Program
    {
        static void Main(string[] args)
        {
            var connection = EventStoreConnection.Create(new Uri("tcp://admin:chan...@127.0.0.1:1113"));
            connection.ConnectAsync().Wait();

            var settings = PersistentSubscriptionSettings
                .Create()
                .StartFromCurrent()
                .WithMessageTimeoutOf(new TimeSpan(100));

            try {
                connection.CreatePersistentSubscriptionAsync(
                    "stream", "group",
                    settings, new UserCredentials("admin", "changeit")
                ).Wait();
            } catch{ }

            try {
                connection.CreatePersistentSubscriptionAsync(
                    "stream2", "group",
                    settings, new UserCredentials("admin", "changeit")
                ).Wait();
            } catch{ }

            var event1 = new EventData(Guid.NewGuid(),"event", false,Encoding.UTF8.GetBytes("Event 1"),null);

            new Handler(connection);

            connection.AppendToStreamAsync("stream", ExpectedVersion.Any, event1);
            connection.AppendToStreamAsync("stream", ExpectedVersion.Any, event1);

            connection.AppendToStreamAsync("stream2", ExpectedVersion.Any, event1);
            connection.AppendToStreamAsync("stream2", ExpectedVersion.Any, event1);

            Console.ReadKey();
        }
    }

    class Handler
    {
        public Handler(IEventStoreConnection connection) {
            connection.ConnectToPersistentSubscription(
                "stream", "group", 
                Handle, (s, r, e) => {}, new UserCredentials("admin", "changeit"), 
                1, false
            );
            
            connection.ConnectToPersistentSubscription(
                "stream2", "group", 
                Handle, (s, r, e) => {}, new UserCredentials("admin", "changeit"), 
                1, false
            );
        }

        void Handle(EventStorePersistentSubscriptionBase subscription, ResolvedEvent @event) {
            Console.WriteLine($"Handler Received: Id '{@event.Event.EventId}' On Stream: '{@event.Event.EventStreamId}' ");
            subscription.Acknowledge(@event);
        }
    }


Thank you 


Greg Young

unread,
Nov 19, 2015, 5:14:29 AM11/19/15
to ddd...@googlegroups.com
Idempotent writes are per stream.
> --
> You received this message because you are subscribed to the Google Groups
> "DDD/CQRS" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to dddcqrs+u...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Ricardo Enes da Silva

unread,
Nov 19, 2015, 1:07:48 PM11/19/15
to DDD/CQRS
Yes  that's what I thought and therefore don't expect that handlers be called twice for the same message posted to different streams 

-Event id cc81832b-60c5-4813-add5-aacb2b7aef7c was published twice to 'stream'  then twice to 'stream2'
-Handler receives event once on  'stream', which is what I expect 
-But twice on 'stream2',

If I pulish to 'stream2first the opposite happens
-Handler receives event once on  'stream2', which is what I expect 
-But twice on 'stream',

Its as though idempotency is only observed for the first stream published to.

BTW I am using the .NET client to connect to Event Store

On Thursday, 19 November 2015 06:19:56 UTC, Ricardo Enes da Silva wrote:
Given 1 Handler subscribed to 2 streams and publishing the same event twice to both streams

I get the following
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. But would expect 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. Curiously If a change publishing order I get 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'

So it appear s idempotency is only working on the first stream created

This is my code
 class Program
    {
        static void Main(string[] args)
        {
            var connection = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"));


Thank you 


Greg Young

unread,
Nov 20, 2015, 11:35:50 AM11/20/15
to ddd...@googlegroups.com
"ExpectedVersion.Any"

Try using expected version and it will work fine.

What is happening is you are only using the "try best you can cache"
which is by eventid. Duplicate event ids between streams is an edge
condition (event id can only be in the cache once). The bigger
question is why are you writing events with the same id to multiple
streams.

Greg
>> Uri("tcp://admin:chan...@127.0.0.1:1113"));

Ricardo Enes da Silva

unread,
Nov 20, 2015, 4:09:49 PM11/20/15
to DDD/CQRS
Thanks Greg !

I guess what I was trying to do is what i think you previously may have mentioned of using projections but was thinking I could do it from the client.e.g. Publish event to TypeStream and CorrelationIdStream. I am guessing this is not the correct approach and that I should only publish to a single stream and rely on projections to republish it to. Is that correctand what should the base stream be, TypeStream ?

Thanks





On Thursday, 19 November 2015 06:19:56 UTC, Ricardo Enes da Silva wrote:
Given 1 Handler subscribed to 2 streams and publishing the same event twice to both streams

I get the following
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. But would expect 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'

.. Curiously If a change publishing order I get 
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream2'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'
Handler Received: Id 'cc81832b-60c5-4813-add5-aacb2b7aef7c' On Stream: 'stream'

So it appear s idempotency is only working on the first stream created

This is my code
 class Program
    {
        static void Main(string[] args)
        {
            var connection = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"));


Thank you 


Reply all
Reply to author
Forward
0 new messages