AQL Based Registry Observable

Skip to first unread message


Nov 23, 2022, 4:17:28 AM11/23/22
to external developers
As mentioned yesterday on Discord, I am currently converting registry twin to AQL based rxjs observables and I am not sure if my current implementation is optimal. Maybe you can have a look and suggest a better solution?

Current solution:
  1. function observeRegistry(pond: Pond, <all infos for detecting relevant events (Tags, event names, limits>){
  2.   const subscriptionCall = (subs: Subscriber<string[]>) => {
  3.     offset, ids = getRegistryViaAQL()
  4. // supply initial values
  5.     return { query: tags, lowerBound: offset },
  6.       async (event: ActyxEvent) => {
  7.           offset, ids = getRegistryViaAQL()
  9.       })
  10.     },
  11.   return new Observable(subscriptionCall)
  12. }
where the getRegistryViaAQL() function uses subqueries and timeRange to compile the list of twin ids, for example for checking if the twin is still in the registry.
I omitted some logic, so please consider the above pseudo code.
There are multiple types of events that add and remove ids from a virtual registry.
One possible point could be to improve tagging but that would add another identifier for each event to maintain in the codebase next to the event type name and the eventType string field.

How would you solve this for more complex registries?

Another drawback for this solution comes up when testing it with more related events in the database:
If another relevant event is published between line '6' and 7, the getter does not return the correct registry for the event that triggered the update. That is especially noticeable during unit tests where I rely on the order of calls to check the state of the registry (or the state of the twin when nesting another observable for the twin state).

I am not sure what would be the correct design pattern to observe
  1. twin registry with more complex logic
  2. the related list of twin states generated from that registry
Thank you!

Roland Kuhn

Nov 23, 2022, 5:14:01 AM11/23/22
to Jesko_IIC, external developers
Hi Jesko,

to summarize, the idea is to get the full registry state with `getRegistryViaAQL` (i.e. in a stateless fashion), relying on the subscription to trigger updates.
To make this most efficient, I’d use `subscribeChunked` and collapse updates from events coming at around the same time.
Consistency of your AQL result with the streaming events you got so far can be obtained by specifying an `upperBound` to be used with the AQL query — this guarantees that the query only sees those events that were already seen by the subscription.
`subscribeChunked` already tracks the delivered OffsetMap for you, so the callback can just be

async (chunk: EventChunk) => {
  const { ids } = getRegistryViaAQL(chunk.upperBound)

I’d also pass `{ maxChunkTimeMs: 1000 }` (or some other suitable value) to reduce the number of queries to what you actually need — the default chunking time window is only 5ms.

OTOH, outside of chaos testing (which is a good idea but rather challenging) you can either control the presence of relevant events (in tests; thus not needing the upperBound) or you want the latest state in any case (in prod; also not needing the upperBound).

BTW (and just in case your omitted code doesn’t already have this): RxJS resource tracking can be somewhat obscure, but I think you need to forward errors and completions to the downstream subscriber as well. And I’d feed the `observeRegistry` stream into a `Subject` to avoid that multiple subscribers multiply the number of AQL queries executed — together with a built-in retry strategy this might well be good to package into this same function to avoid suboptimal usage.



You received this message because you are subscribed to the Google Groups "external developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
To view this discussion on the web visit

Dr. Roland Kuhn
CTO — Actyx AG
+49 176 72 988 110

Reply all
Reply to author
0 new messages