Hi,
I am trying to use Hazelcast continous query cache feature.
My use case is to keep track of total api call count in a distributed system and apply thresholds when required. Each client node periodically inserts its own data to map and periodically receives/queries
data of other nodes. As far as I understand, continuous query cache acts like a local/continuous materialized view of the target map and therefore would fit to my case.
I have client server topology with hazelcast version set to 3.9. I am using the coalesce feature of continuous query cache on client nodes. I am inserting elements to the map with ttl (typically 30 seconds) . Indexes exist for queried fields.
Here are the problems I encountered and my questions:
1) When I try to use combined Hazelcast predicates (EqualPredicate, GreaterPredicate and etc. combined with and) for continuous cache query, I frequently saw the below exception in Hazelcast nodes:
2018-07-27 14:26:38,799 : [ERROR] [hz._hzInstance_1_mycluster.partition-operation.thread-7][SetOperation] [myhost]:9000 [mycluster] [3.9] null
java.lang.NullPointerException
at com.hazelcast.query.impl.CachedQueryEntry.getTargetObject(CachedQueryEntry.java:105)
at com.hazelcast.query.impl.QueryableEntry.extractAttributeValue(QueryableEntry.java:81)
at com.hazelcast.query.impl.QueryableEntry.getAttributeValue(QueryableEntry.java:48)
at com.hazelcast.query.impl.predicates.AbstractPredicate.readAttributeValue(AbstractPredicate.java:132)
at com.hazelcast.query.impl.predicates.AbstractPredicate.apply(AbstractPredicate.java:57)
at com.hazelcast.query.impl.predicates.NotEqualPredicate.apply(NotEqualPredicate.java:41)
at com.hazelcast.query.impl.predicates.AndPredicate.apply(AndPredicate.java:129)
at com.hazelcast.map.impl.query.QueryEventFilter.eval(QueryEventFilter.java:53)
at com.hazelcast.map.impl.event.AbstractFilteringStrategy.evaluateQueryEventFilter(AbstractFilteringStrategy.java:77)
at com.hazelcast.map.impl.event.DefaultEntryEventFilteringStrategy.processQueryEventFilter(DefaultEntryEventFilteringStrategy.java:125)
at com.hazelcast.map.impl.event.DefaultEntryEventFilteringStrategy.doFilter(DefaultEntryEventFilteringStrategy.java:84)
at com.hazelcast.map.impl.event.QueryCacheEventPublisher.getCQCEventTypeOrNull(QueryCacheEventPublisher.java:164)
at com.hazelcast.map.impl.event.QueryCacheEventPublisher.convertQueryCacheEventDataOrNull(QueryCacheEventPublisher.java:129)
at com.hazelcast.map.impl.event.QueryCacheEventPublisher.addEventToQueryCache(QueryCacheEventPublisher.java:92)
at com.hazelcast.map.impl.event.MapEventPublisherImpl.postPublishEvent(MapEventPublisherImpl.java:224)
at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:204)
at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:160)
at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:148)
at com.hazelcast.map.impl.operation.BasePutOperation.afterRun(BasePutOperation.java:53)
at com.hazelcast.map.impl.operation.SetOperation.afterRun(SetOperation.java:40)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.afterRun(OperationRunnerImpl.java:295)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:196)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:406)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:433)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:569)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:554)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:513)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:207)
at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:60)
at com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask.processMessage(AbstractPartitionMessageTask.java:67)
at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage(AbstractMessageTask.java:123)
at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run(AbstractMessageTask.java:103)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:154)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:125)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
This did not make any sense to me. It seems that somehow the entry value is null ? Is this due to expiry? Can this be avoided?
2) After receiving the above exception , I implemented my custom predicate like below where I had to put null check before applying predicate logic
public class MyCustomPredicate implements Predicate<String, MyMapElement>, Serializable {
@Override public boolean apply(Entry<String, MyMapElement> paramEntry) {
MyMapElement myMapElement = paramEntry.getValue();
if (myMapElement == null) {
return false;
}
//MY CUSTOM PREDICATE LOGIC
}
}
Like above entry vales were sometimes null... That's why I had to put the null check..
But with this implementation, I think that my index config is useless? Is that right? If that's the case, how can I make my custom predicate make use of indexes?
3) Would you recommend any other hazelcast feature for my problem? Another option seemed to me using the aggregations but I am not sure which would perform better. Any suggestions?
4) This will eventually work in production code with around 70 clients (which means 70 distinct continuous queries) with a map size around 50000. Would that be appropriate?
Best regards