snapshots in hazelcast jet

151 views
Skip to first unread message

Fatih Arslan

unread,
Mar 5, 2020, 6:40:13 AM3/5/20
to hazelcast-jet
Hi,
I am reading about the snapshots in hazelcast jet for fault tolerance and I found it a bit hard to understand and I read the source code. My source is kafka and the stream sinks elements to a hazelcast map and I see that offsets are never committed to kafka cluster if snapshots are Enabled. saveToSnapshot saves the snapshots to memory and  restoreFromSnapshot restores the snapshots and seeks to the restored offset. What confuses me is the persistency of the offsets in this picture. Since kafka cluster is never aware of the offsets(commitsync or commitasync is never called), only the snapshots are aware of the offsets and jobs resume from the last snapshotted offset by seeking to the last snapshotted offset and i do not understand the persistency of the offset in this picture. Lets say I have only one jet node streaming from Kafka and sinking it to hazelcast map with at least once guarantee and if this streaming node is down, snapshots(which should be only in memory) are lost and when the streaming node restarts, stream starts to read from the earliest(or latest based on kafka config). Is my understanding correct?
How can I make sure the stream commits the offsets only after every record is put into the map? 
I am searching as simple as 

Screenshot at 2020-03-05 12-38-47.png

  

Fatih

Can Gencer

unread,
Mar 5, 2020, 6:48:38 AM3/5/20
to hazelcast-jet
Hi Fatih,

Yes, Jet doesn't commit any offsets to Kafka because it tracks them internally and needs to save them in a consistent state with the rest of the computational state. They're not committed on purpose because it could result in missing records when re-processing after a restart.

If you have a cluster of 3 Jet nodes, and one of them goes down then you will not lose the Jet snapshot state since it's replicated to the other nodes. If the whole cluster goes down, then you will lose the Kafka offsets, but you will also lose the IMap contents in this case. You should be able to replay the Kafka topic back from the beginning to populate the IMap again. 

For Jet Enterprise version, there's a "lossless restart" feature where you can shutdown the whole cluster, and the whole state ( + any IMaps configured to do so) will be persisted to disk which you can restore from.

Fatih Arslan

unread,
Mar 5, 2020, 8:09:42 AM3/5/20
to hazelcast-jet
Thanks Can,

In our use case, we run single jet instance and we lose our state if the jet instance is stopped. In this case, after the jet instance is restarted, we start to read from beginning in case of failure.  I see that  in StreamKafkaP.class, complete() method has 

if (!this.snapshottingEnabled) {
this.consumer.commitSync();
}

In that case, if I disable snapshotting, will at least once processing be guaranteed?

The use case I would like to have is to send commit message to kafka only after polled messages are sinked to hazelcast map? It does not matter to process some messages twice or more. 


On Thursday, March 5, 2020 at 12:40:13 PM UTC+1, Fatih Arslan wrote:

Viliam Durina

unread,
Mar 5, 2020, 11:25:01 AM3/5/20
to hazelcast-jet
Hi Fatih,

If you disable snapshotting, there won't be any guarantee. Items can be missed and duplicated. In Jet 4.0 we also removed the offset commits entirely because of this.

It seems that you have no other stateful processing in your job, you're just moving data from kafka to a map. You could commit offsets in the 2nd snapshot phase in Jet 4.0 (in the `Processor.snapshotCommitFinish()` method), but you would have to write your own processor. If you process no data in the source between snapshot phases (that is between calls to `snapshotCommitPrepare()` and `snapshotCommitFinish()`), then in `snapshotCommitFinish()` it's guaranteed that the sink already processed all the items that the source emitted before `snapshotCommitPrepare()`. See the javadoc in the `Processor` class, it's quite detailed.

If you try, post your code here and we can review it.

Viliam

Fatih Arslan

unread,
Mar 5, 2020, 12:14:25 PM3/5/20
to hazelcast-jet
Hi Viliam,

I read the API and nice to see the two phase commit support in hazelcast jet 4.0. I can use jet 4.0 and I can have my own version StreamKafkaP which implements the Processor interface of 4.0. My question is is there any performance impact expected because of two phase commit? My goal is very simple commit offsets after elements are sink to the map, I do not need anything close to two phase commit for that. 
Shall I still need version 4.0?

Viliam Durina

unread,
Mar 5, 2020, 1:05:09 PM3/5/20
to hazelcast-jet
Performance impact of the two snapshot phases in snapshot is very little. Most of the performance hit will come from not doing anything between the phases. But since you have no other stateful stages in your job, it should be very quick.

You're not doing two-phase commit. You'll just do `commitAsync` on the KafkaConsumer in the 2nd phase of the snapshot, that is in the `snapshotCommitFinish()` method. Should be easy to add to existing `StreamKafkaP`, we think of supporting it in future Jet version.

Viliam

Fatih Arslan

unread,
Mar 27, 2020, 4:49:55 AM3/27/20
to hazelcast-jet
Hi William,

I try to upgrade to jet 4.0. Can I use jet 4.0 with hazelcast 3.x?

Can Gencer

unread,
Mar 27, 2020, 4:55:27 AM3/27/20
to Fatih Arslan, hazelcast-jet
Hi Fatih,

In what way would you like to use it? To read/write to another Hazelcast cluster?



--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/4a533c08-7fc4-4570-b6d0-716572df0782%40googlegroups.com.

This message contains confidential information and is intended only for the individuals named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmission cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender therefore does not accept liability for any errors or omissions in the contents of this message, which arise as a result of e-mail transmission. If verification is required, please request a hard-copy version. -Hazelcast

Viliam Durina

unread,
Mar 27, 2020, 4:55:57 AM3/27/20
to hazelcast-jet
Nope. Jet 4 can't even connect to remote Hz 3 cluster (for example, for Sources.remoteMap()).
Viliam

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/4a533c08-7fc4-4570-b6d0-716572df0782%40googlegroups.com.


--
Viliam Durina
Jet Developer
      hazelcast®
 
 
2 W 5th Ave, Ste 300 | San Mateo, CA 94402 | USA
+1 (650) 521-5453 | hazelcast.com

Fatih Arslan

unread,
Mar 27, 2020, 5:30:37 AM3/27/20
to hazelcast-jet
Yes, I want to use jet to write to the cluster. So it means we need to upgrade everything to be 4.x to be on the safe side.


On Friday, March 27, 2020 at 9:55:57 AM UTC+1, Viliam Durina wrote:
Nope. Jet 4 can't even connect to remote Hz 3 cluster (for example, for Sources.remoteMap()).
Viliam

To unsubscribe from this group and stop receiving emails from it, send an email to hazelc...@googlegroups.com.

Fatih Arslan

unread,
Mar 30, 2020, 7:54:51 AM3/30/20
to hazelcast-jet
the cluster also needs to be ugraded to 4.0.
according to https://docs.hazelcast.org/docs/jet/4.0/manual/#jet-version-compatibility, does it mean that if I upgrade the cluster to 4.0, then I can not use pipelines written by 3.x So that I have to upgrade the cluster to 4.0 first and then deploy the new pipelines written by 4.x?

Can Gencer

unread,
Mar 30, 2020, 7:56:39 AM3/30/20
to Fatih Arslan, hazelcast-jet
Hi Fatih,

You can see version compatibility guidelines here: https://jet-start.sh/docs/operations/version-compatibility

There'll be a blog with Jet 4.0 migration guide, however the changes are mostly trivial - you should be able to make your 3.x pipelines work in 4.0 with minimal changes.

To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/26797dba-ebd6-4b56-b8b8-67959897705d%40googlegroups.com.

Fatih Arslan

unread,
Mar 30, 2020, 8:16:43 AM3/30/20
to hazelcast-jet
changes are trivial I agree.

but as far as I see from the compatibility guideline, no component is guaranted to work together between major releases. We try to evaluate the order of upgrades and once the hazelcast imdg cluster is upgraded to 4.0, it is not possible to use the cluster with services of 3.x. So once the hz cluster is upgraded, every other service that uses 3.x should be upgraded to 4.x and then deployed. Once the cluster is upgraded, we can not use the services that depend on 3.x anymore, right?


On Monday, March 30, 2020 at 1:56:39 PM UTC+2, Can Gencer wrote:
Hi Fatih,

You can see version compatibility guidelines here: https://jet-start.sh/docs/operations/version-compatibility

There'll be a blog with Jet 4.0 migration guide, however the changes are mostly trivial - you should be able to make your 3.x pipelines work in 4.0 with minimal changes.

Can Gencer

unread,
Mar 30, 2020, 8:20:34 AM3/30/20
to Fatih Arslan, hazelcast-jet
Hi Fatih,

Yes that is right. Hazelcast 3.0 was released in 2013 so it has been 7 years since the last major version. With Jet, we also updated the Hazelcast version used by Jet to 4.0 and hence most of the breaking changes are related to changes in Hazelcast. 

To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast-jet/53f58698-37d6-4442-9b1d-53b3f5c70e09%40googlegroups.com.

Fatih Arslan

unread,
May 25, 2020, 6:54:12 AM5/25/20
to hazelcast-jet
Hi , 
We upgraded our hazelcast server to 4.0 and also hazelcast-jet to 4.0. Everything works smooth.

We also have some python clients to read from hazelcast server, but we saw that python client is not updated to 4.0 and looks like the 4.0 version release is scheduled to october 2020. In the mean time, what can we do about python clients? Is there a way to use python clients?


On Thursday, March 5, 2020 at 12:40:13 PM UTC+1, Fatih Arslan wrote:

Can Gencer

unread,
May 26, 2020, 8:13:58 AM5/26/20
to hazelcast-jet
Hi Fatih,

Unfortunately there won't be a way to use Python client with 4.0 until this PR is merged: https://github.com/hazelcast/hazelcast-python-client/pull/200, after that you can use the version built from the sources.

Fatih Arslan

unread,
May 26, 2020, 8:28:43 AM5/26/20
to hazelcast-jet
Hi Can,

When do you expect it to be merged?

Can Gencer

unread,
May 28, 2020, 4:39:32 AM5/28/20
to Fatih Arslan, hazelcast-jet
Hi Fatih,

Unfortunately I'm not able to say when it will be done.

--
You received this message because you are subscribed to the Google Groups "hazelcast-jet" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast-je...@googlegroups.com.

Metin Dumandag

unread,
Aug 12, 2020, 9:49:25 AM8/12/20
to hazelcast-jet
Fatih,
The official release date is still October 2020 but in a few weeks (2 or 3) you should be able to use the master branch of the Python client to connect 4.x clusters.

fatih.ar...@gmail.com

unread,
Sep 30, 2020, 4:41:47 AM9/30/20
to hazelcast-jet
Hi Metin, 
we expect python client to be available as of late september as indicated in the webpage. Can we continue with master branch till it is officially released?

Metin Dumandag

unread,
Oct 5, 2020, 3:20:15 AM10/5/20
to hazelcast-jet
Hi Fatih,
The end of September was an optimistic release date for all clients.  We have started working on the Python client pretty late (at the beginning of August) and the end of September was not feasible considering what goes into the 4.0 release. We have updated the expected release date to the end of October. You can use the master branch until then.  
.
I see that the .org website is not updated with this information. Let me contact with guys in charge to update that.

fatih.ar...@gmail.com

unread,
Mar 4, 2021, 6:06:56 PM3/4/21
to hazelcast-jet
This is a very old topic, lets make it alive :)

I want to commit offsets only after I am done with processing all messages I polled, so it is effectively At_least_once_guarantee. I can use processing guarantees but the problem is we use embedded jet inside a microservice and rolling upgrades is not always possible. Sometimes we have to kill all the instances and deploy the new one. I need durable offset rather than the memory oriented solution. 
I need StreamKafkaP but also I need to override  snapshotCommitFinish() method so that I can commit offsets as part of snapshotCommitFinish()

Since I do not want any code duplication, I can extend StreamKafkaP class only with the overrdden snapshotCommitFinish() method. This is not possible because StreamKafkaP is a final class. 

I can also use composition like creating  StreamKafkaPDecorator class only with StreamKafkaP and overriding  snapshotCommitFinish(). The problem is no fields of StreamKafkaP is exposed to outside, so it is not possible to access the offsets, or KafkaConsumer. It is not possible with the current design if I am not mistaken. I am working on Jet 4.3.

I think all I can do is copy paste the whole StreamKafkaP and name it as StreamKafkaPOffsetCommitted and add 
snapshotCommitFinish() method. 

Do you see any other way? 

Can Gencer

unread,
Mar 5, 2021, 3:23:00 AM3/5/21
to fatih.ar...@gmail.com, hazelcast-jet
Hi Fatih,

Enterprise version of Jet supports exporting job snapshots to map, which you can backup to disk as well. Then you can restore your job from this snapshot, which should continue where it left off. I believe you're in the process of acquiring the enterprise version of Hazelcast currently?

fatih.ar...@gmail.com

unread,
Mar 5, 2021, 4:46:29 AM3/5/21
to hazelcast-jet
Hi Can,
This is true but we are in the process of enterprise integration but we are not there yet. The pipeline I am working on now is reading from kafka and writing to kafka. To be honest, writing the offsets to a map and reading them again is not the way I want to steer because it will create another dependency to the hazelcast server. I prefer to make it simple without any hazelcast server dependency. 
As far as I see, there is no way of doing it without copying StreamKafkaP class and adding one more method snapshotCommitFinish().

Viliam Durina

unread,
Mar 8, 2021, 5:23:10 AM3/8/21
to fatih.ar...@gmail.com, hazelcast-jet
There's also Hot Restart feature in enterprise - with it your snapshots will be preserved when you restart the cluster. If you set your cluster to Passive, jobs will stop and write a snapshot. After cluster restart they will continue from that snapshot.

However Jet doesn't support rolling upgrades currently. It's likely that it will work in some cases (because we don't change binary format that much), but we don't guarantee it.

Viliam



--
Viliam Durina
Jet Developer
      hazelcast®
 
 
2 W 5th Ave, Ste 300 | San Mateo, CA 94402 | USA
+1 (650) 521-5453 | hazelcast.com

Can Gencer

unread,
Mar 8, 2021, 6:02:46 AM3/8/21
to fatih.ar...@gmail.com, hazelcast-jet
Hi Fatih,

The job restart feature works completely ransparently. You shut down a cluster, restart it and it will continue reading from where you left off, you don't have to deal with any separate Hazelcast server or individual maps.

fatih.ar...@gmail.com

unread,
Mar 8, 2021, 6:29:14 AM3/8/21
to hazelcast-jet
Thanks Can and Villiam.

I suppose we have to mount a persistent volume to our containers then. We do not use a dedicated jet cluster but package the jet in a fat jar and run it like a microservice. Seems like this is clear what to do.

Thanks
Reply all
Reply to author
Forward
0 new messages