sequenceNr assignment: globally unique?

131 views
Skip to first unread message

fred

unread,
Jun 13, 2013, 1:24:12 PM6/13/13
to events...@googlegroups.com
Hello list,

Reading the idempotency section of the project's doc I decided to try checking if sequenceNr were generated sequentially, i.e., newSequenceNr = lastSequenceNr + 1.

I have 4 different processors (they all have unique processorId) say processor[1-4] and they use the same (HBase) journal.

I sent 1 Message to each of the 4 processor, what I see is:
Message in processor1 has sequenceNr == 1
Message in processor2 has sequenceNr == 2
Message in processor3 has sequenceNr == 3
Message in processor4 has sequenceNr == 4

It seems there's some kind of synchronization between different processors and I see no reason for that, is it an implementation choice or does this happen because of a mistake I did somewhere?

From an older event table though, I had messages for different processors having sometimes (rarely) the same sequenceNr.

What I was hoping:
sequenceNr of Message for every processor start at 1, and increment in steps of 1. That way we can check for duplicates (as written in the idempotency section) AND check for missing messages.

Thank you, Fred

Martin Krasser

unread,
Jun 14, 2013, 1:53:41 AM6/14/13
to events...@googlegroups.com
Hi Fred,

On 13.06.13 19:24, fred wrote:
> Hello list,
>
> Reading the idempotency section of the project's doc I decided to try
> checking if sequenceNr were generated sequentially, i.e.,
> newSequenceNr = lastSequenceNr + 1.

You cannot make any assumptions on the size of gaps between the sequence
numbers.

>
> I have 4 different processors (they all have unique processorId) say
> processor[1-4] and they use the same (HBase) journal.
>
> I sent 1 Message to each of the 4 processor, what I see is:
> Message in processor1 has sequenceNr == 1
> Message in processor2 has sequenceNr == 2
> Message in processor3 has sequenceNr == 3
> Message in processor4 has sequenceNr == 4
>
> It seems there's some kind of synchronization between different
> processors and I see no reason for that, is it an implementation
> choice or does this happen because of a mistake I did somewhere?

Sequence numbers are assigned by the journal actor that is configured
for an EventsourcingExtension. All processors that have been created
with this extension share the same journal actor instance. In other
words, there's one sequence number generator per EventsourcingExtension
(and hence per ActorSystem). Future versions of Eventsourced may use
this as a basis for vector clocks in applications composed of several
distributed ActorSystems.

Having sequence numbers generated by a single actor instance doesn't
actually mean that writes to the backend store are also made
sequentially. For example, the HBase journal is able to make concurrent
writes and re-sequence the outcome of the writes before forwarding the
written messages to processors (using the previously generated sequence
numbers).

>
> From an older event table though, I had messages for different
> processors having sometimes (rarely) the same sequenceNr.

Actors from different ActorSystems may see the same sequence number
(each ActorSystem uses its own journal instance). If you want to share
the same event table in HBase across all Actorystems (which is a
frequent use case) you need to ensure that all processors (across all
ActorSystems) have different ids.

>
> What I was hoping:
> sequenceNr of Message for every processor start at 1, and increment in
> steps of 1.

This is not the case.

> That way we can check for duplicates (as written in the idempotency
> section) AND check for missing messages.

There are several other ways to check for missing messages. Can you
sketch your use case please?

>
> Thank you, Fred
> --
> You received this message because you are subscribed to the Google
> Groups "Eventsourced User List" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to eventsourced...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>

--
Martin Krasser

blog: http://krasserm.blogspot.com
code: http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

fred

unread,
Jun 14, 2013, 1:04:40 PM6/14/13
to events...@googlegroups.com, kras...@googlemail.com
Thank you Martin.
Just to be sure I understood you, the messages are still received by the processors ordered according to their sequenceNr, right?

>
> From an older event table though, I had messages for different
> processors having sometimes (rarely) the same sequenceNr.

Actors from different ActorSystems may see the same sequence number
(each ActorSystem uses its own journal instance). If you want to share
the same event table in HBase across all Actorystems (which is a
frequent use case) you need to ensure that all processors (across all
ActorSystems) have different ids.

>
> What I was hoping:
> sequenceNr of Message for every processor start at 1, and increment in
> steps of 1.

This is not the case.

> That way we can check for duplicates (as written in the idempotency
> section) AND check for missing messages.

There are several other ways to check for missing messages. Can you
sketch your use case please?

Sure.
 
In the context of an akka cluster, I have different Eventsourced processors potentially running on different systems. They all share the same event table in HBase and all have different processorId. For reasons similar to those of Fabian Page (not enough RAM), I've chosen to use eventsourced actors as routers to different remote plain actors.

In the context of recovery, I have to take into account a scenario where the eventsourced processors are running fine but where one of my plain remote actors fails.

In that situation, I would like to be able to create a new instance of that plain actor and replay the event log. But meanwhile, the eventsourced processor still being fine, new events keep arriving to the replaying actor. I need something in front of my recovering plain actor to buffer those events until the replay is over. Buffering them is quite easy but I'm wondering how I can be sure all the events have been replayed. Duplicates are easy to get rid of, but I want to be sure there is no gap between the events received during the replay and the stream of buffered events.

Martin Krasser

unread,
Jun 15, 2013, 3:27:35 AM6/15/13
to events...@googlegroups.com
Yes, of course. Eventsourced also guarantees that, during a replay, a processor receives messages in the same order as it initially received them during 'normal operation'. This even works if processors are connected via channels to graphs that may also contain cycles.



>
> From an older event table though, I had messages for different
> processors having sometimes (rarely) the same sequenceNr.

Actors from different ActorSystems may see the same sequence number
(each ActorSystem uses its own journal instance). If you want to share
the same event table in HBase across all Actorystems (which is a
frequent use case) you need to ensure that all processors (across all
ActorSystems) have different ids.

>
> What I was hoping:
> sequenceNr of Message for every processor start at 1, and increment in
> steps of 1.

This is not the case.

> That way we can check for duplicates (as written in the idempotency
> section) AND check for missing messages.

There are several other ways to check for missing messages. Can you
sketch your use case please?

Sure.
 
In the context of an akka cluster, I have different Eventsourced processors potentially running on different systems. They all share the same event table in HBase and all have different processorId. For reasons similar to those of Fabian Page (not enough RAM), I've chosen to use eventsourced actors as routers to different remote plain actors.

In the context of recovery, I have to take into account a scenario where the eventsourced processors are running fine but where one of my plain remote actors fails.

That's the downside when using remote plain actors. You have to take care of reliable delivery yourself (alternative below ...)



In that situation, I would like to be able to create a new instance of that plain actor and replay the event log. But meanwhile, the eventsourced processor still being fine, new events keep arriving to the replaying actor. I need something in front of my recovering plain actor to buffer those events until the replay is over. Buffering them is quite easy but I'm wondering how I can be sure all the events have been replayed. Duplicates are easy to get rid of, but I want to be sure there is no gap between the events received during the replay and the stream of buffered events.

You can implement these requirements by making the remote actors eventsourced and let the router actor communicate with the remote eventsourced actors (processors) via a reliable channel (one channel for each router-processor pair).

- whenever a remote processor goes down and becomes unavailable, all messages targeted at this processor will be buffered by the reliable channel
- when the remote processor is recovered, a replay will resend all messages to that processor that it already received (before it went down).
- any additional messages (buffered by the channel) will now be delivered to the recovered remote processor. You just need to make sure that the remote processor is recovered before is re-joins the cluster and receives buffered messages from the channel.

The last step may require a re-initialization of the channel, as the new remote processor is now a different destination actor ref for the channel. If it is the same actor ref (for example, in situations where the remote processor did not crash but was only separated from the router because of a network failure, the channel will automatically retry delivery of buffered messages, assuming a properly configured redelivery policy). Duplicates are possible in both cases. Remote processors will see all messages in the correct order (i.e. there won't be messages missing). Furthermore, this setup is also able to recover from router JVM crashes.

A variant of the above example would be to use a plain actor as router (useful if the routing logic is static, for example).

Your use case would make a very nice advanced clustering example to be included in the Eventsourced distribution. I'll create a related and try to find some time implementing it. Of course, I'd highly appreciate if you could contribute such an example to Eventsourced.

Cheers,
Martin

Martin Krasser

unread,
Jun 20, 2013, 8:01:12 AM6/20/13
to events...@googlegroups.com
Although not a cluster example, I just added https://github.com/eligosource/eventsourced#remote-destinations to the docs that covers most of your concerns.

Hope that helps.

Cheers,
Martin

fred

unread,
Jul 22, 2013, 5:45:14 PM7/22/13
to events...@googlegroups.com, kras...@googlemail.com
Hi Martin,

I keep getting diverted from coming back on that one. I'm sorry for the lack of feedback from my end, I'll send some as soon as I have time to take a look at the ideas you suggested.

Thanks, Fred

HP

unread,
Aug 13, 2013, 3:10:57 PM8/13/13
to events...@googlegroups.com, kras...@googlemail.com
Hi Martin, I was reading through the list here and had some questions on your suggestions on a distributed application using eventsourced.

How is reliable channel re-initialization accomplished?  Would I just stop the channel actor and create a new channel with same id/name with new target ActorRef and it will just pick up where it left off ?

As for discovering the ActorRef of the recovered processor on another node, I was thinking the DistributedPubSubMediator could be used.  What do you think?

Thanks,
Hiral

Martin Krasser

unread,
Aug 14, 2013, 6:43:16 AM8/14/13
to events...@googlegroups.com
Hi Hiral,

I'm still on vacation. Please give me a few more days for a reply.

Thanks,
Martin

Martin Krasser

unread,
Aug 19, 2013, 4:15:41 AM8/19/13
to events...@googlegroups.com
Hi Hiral,


On 13.08.13 21:10, HP wrote:
Hi Martin, I was reading through the list here and had some questions on your suggestions on a distributed application using eventsourced.

How is reliable channel re-initialization accomplished?  Would I just stop the channel actor and create a new channel with same id/name with new target ActorRef and it will just pick up where it left off ?

Yes, this will work.



As for discovering the ActorRef of the recovered processor on another node, I was thinking the DistributedPubSubMediator could be used.  What do you think?

Writing your own channel destination proxy (as shown in this section), you can use any lookup/discovery mechanisms you want. I don't have any experience with DistributedPubSubMediator so far but it looks like a valid choice for me.

Cheers,
Martin

delasoul

unread,
Aug 20, 2013, 4:32:37 AM8/20/13
to events...@googlegroups.com, kras...@googlemail.com
Hello,

we are using the DistributedPubSubMediator with eventsourced - although for a different usecase:

we wanted a more dynamic way(opposed to statically configure channels upfront and looking up channels by id/name) to setup pub/sub for events emitted by an eventsourced actor and possible event listeners without loosing a channel's ack functionality to avoid resending of events in case of replay( we don't need reliable channels in the moment).
The eventsourced actor calls publish(event) and all listeners do subscribe(event).
For now it was sufficient to write our own "DistributedPubSubChannel" that handles the possible multiple acks for an event and uses the DistributedPubSubmediator as destination.
To get this working with reliable channels I guess we will have to add functionality to the DistributedPubSubMediator as it holds the subscription information,
but when Martin changes processors/channels all of this is probably not needed anymore?

michael

Martin Krasser

unread,
Aug 20, 2013, 7:52:06 AM8/20/13
to events...@googlegroups.com

On 20.08.13 10:32, delasoul wrote:
Hello,

we are using the DistributedPubSubMediator with eventsourced - although for a different usecase:

we wanted a more dynamic way(opposed to statically configure channels upfront and looking up channels by id/name) to setup pub/sub for events emitted by an eventsourced actor and possible event listeners without loosing a channel's ack functionality to avoid resending of events in case of replay( we don't need reliable channels in the moment).
The eventsourced actor calls publish(event) and all listeners do subscribe(event).
For now it was sufficient to write our own "DistributedPubSubChannel" that handles the possible multiple acks for an event and uses the DistributedPubSubmediator as destination.
To get this working with reliable channels I guess we will have to add functionality to the DistributedPubSubMediator as it holds the subscription information,
but when Martin changes processors/channels all of this is probably not needed anymore?

channels will continue to deliver messages only to a single destination actor. Any additional routing logic (incl. collection of acks from routees) must/should be done by that destination actor. This is comparable to a channel destination that further sends messages to a message broker (that manages point-to-point channels, publish-subscribe channels, etc).
Reply all
Reply to author
Forward
0 new messages