Alerting / state machine use case with Hazelcast Jet

26 views
Skip to first unread message

pdu...@gmail.com

unread,
Jan 18, 2019, 3:33:46 AM1/18/19
to hazelcast-jet
Hello Hazelcast Jet users,

I was trying to implement an alerting use case with Hazelcast Jet. Basically, I have a set of sensors emitting periodically a couple of measurement (temperature, humidity, noise,...).
For each sensors, the job should emit notification whenever the value of one measurement goes above some thresholds. It should also emit a notification-off event whenever the value went back to normal. So, the events should be processed in order per sensorID. Is this something feasible with Hazelcast Jet?


Technically, I am using a custom RabbitMQ source. SensorID are partioned by groups. Each group is processed by one instance of the job. Events are guaranteed by the in order wihtin each group at the source.

The problem occurs when the job is restarted. Events that have buffered while the job is off will arrive in a burst to the job. When they are handled to the cooperative thread pool, the order is lost despite the groupBy clause.

Any guidance on how to tackle this issue would be greatly appreciated :)


c...@hazelcast.com

unread,
Jan 18, 2019, 6:21:29 AM1/18/19
to hazelcast-jet
Hi Patrick,

Any mapping / flatMapping stages after a source will cause items to be reordered in the stream. I can see that you tried to add order again by using Watermarks / timestamps and windowing. 

The watermarks are generated at the point you add timestamps, in your case looking at the repo it would be after the flat mapping step. The watermark advances when it has advanced in all of the upstream processors - meaning it's as ahead as the processor that is most behind. The watermark are generated based on the events with the highest timestamp seen so far minus the lag. 

For example if one of your source processors has groups 1 and 2 assigned, what might be happening when you get a burst of events is that the watermark is advancing too eagerly because you will receive several seconds worth of the events from group 1, and then several seconds worth of events from group 2. If all the events from group 1 are emitted first, then the watermark will be advanced before items from group 2 are processed, and hence they will be considered late.

I think perhaps the core of your problem is that you don't necessarily need watermarks because your items are always ordered. If you were to use watermarks you would need to keep track of the current watermark for each RabbitMQ queue separately which will be rather complex. We currently do this for our Kafka source (StreamKafkaP), each Kafka partition is tracked separately for watermarks. You can also work around this by having 1 queue per processor.

However, since your items are already ordered, what I suggest is that instead you want to avoid shuffling during mapping and flat mapping stages. We're thinking about adding more routing options to pipeline API but for now you can try something like this:

      StreamStage<Notification> ss = l
              .drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>>streamFromProcessor("rabbitmq",
                      ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
              .groupingKey(e -> e.getSendorId())
              .rollingAggregate(AggregateOperation.withCreate(() -> new State()).andAccumulate((state, e) -> ..).andExportFinish(state -> ..), (key, exportedState -> ...)
              .drainTo(Sinks.logger())


What's happening here is that right after the source, I'm doing the grouping and then doing the rolling aggregate operation on these groups. This way all events for the same sensorId should arrive in the same order they were emitted from the source. You should be able express your flat mapping stage inside the rolling aggregate operation by doing the looping inside the accumulation function. (if this is not possible - you can also use the `groupingKey().flatMapWithContext()` operation - which will also keep the groups ordered)

The rolling aggregate works as follows:

- For each key (in this case sensor ID) you would be creating an accumulator - this is your "state" for each key
- For each input msg, you accumulate to your state and then you output some value in the exportFn
- In the second parameter, mapToOutputFn, you can combine the output of exportFn with the grouping key to emit some value. If you return null here, no value will be emitted. 

Inside your state for the rolling aggregate operation you should be able to track the current threshold, and if it exceeded some value or went below some value. Does that make sense?
Reply all
Reply to author
Forward
0 new messages