How to update existing object atomically?

96 views
Skip to first unread message

Constantine Zachariadis

unread,
Nov 16, 2022, 11:30:23 PM11/16/22
to cqengine-discuss
Let's say I stored an object in cqengine and I want to perform an update atomically where a single field has changed. Similar to a Sql Update Car Set Window=up Where licenseplate="abcde" statement.

Thx 

Constantine Zachariadis

unread,
Nov 16, 2022, 11:36:19 PM11/16/22
to cqengine-discuss
To add more clarity, I get my objects from a message bus and they may arrive in parts so when I get a subsequent part, I'd l ike to just be able to ammend the object. To do this would require a read to fetch the existing object, modifying it with the new information and putting it back. But, in order to protect the object from being incompletely modified by other threads, I'd like to do it atomically or as a 2nd best approach to do it with optimistic concurrency where I read which would fetch a version of it and then the update should check that the version I am sending in and the version in cqengine are still the same and no other threads have modified it.

Niall Gallagher

unread,
Nov 17, 2022, 4:48:05 PM11/17/22
to cqengine...@googlegroups.com
Take a look at the TransactionalIndexedCollection[1].

Its JavaDoc provides guidance on atomically replacing objects.

It will require though, that you actually produce a new distinct version of your object for each mutation.

Using the TransactionalIndexedCollection will ensure that your individual changes will be made visible to reading threads atomically, and no query will see 2 versions of the same object in its results. It will also protect internal indexes from becoming inconsistent due to concurrent modification.

However be aware that for the use case you mentioned, that alone won't be enough to ensure thread safety.

For example, if 2 updates were received on your message bus around the same time, and processed in parallel by different threads, it would be possible that each thread would read the same version and attempt to race each other to store 2 different versions which lack each others updates. 

To solve that you could look at using a Striped<Lock> or similar (e.g. from Google Guava).

When you receive an update from your message bus, read some unique identifier (or a key object which implements equals and hashCode) from the update. This would uniquely identify the object you are going to modify. Use that unique key or identifier to acquire a lock from the striped lock. Then read the current version of your object from the collection, update the collection with a new version of it, then release the lock.

This should ensure that if such updates arrive at the same time, there won't be any lost updates.

Hope that helps!
Niall




--
-- You received this message because you are subscribed to the "cqengine-discuss" group.
http://groups.google.com/group/cqengine-discuss
---
You received this message because you are subscribed to the Google Groups "cqengine-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cqengine-discu...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cqengine-discuss/52465ebc-6e93-4b9c-b82e-9db5500d3100n%40googlegroups.com.

Constantine Zachariadis

unread,
Nov 17, 2022, 5:04:20 PM11/17/22
to cqengine-discuss
Hi Niall, thx. My input message has what can be construed as a primary key indicated by a Long field (Id).   I was hoping to be able to do the whole thing atomically like the java  ConcurrentHashMap does like this:

MyObject newObject = cache.compute(inputMessage.Id(), (k,v)-> createNewObject( v,inputMessage));

where cache is a ConcurrentHashMap<Long, MyObject>() and
the method createNewObject is:
MyObject createNewObject(MyObject instanceFromCache, InputMessage inputMessage);

But, the StripedLock sounds like a good solution in the interim. Thx again.
Reply all
Reply to author
Forward
0 new messages