Continous query cache npe and other problems

92 views
Skip to first unread message

Ceyhan Kasap

unread,
Jul 30, 2018, 2:26:24 AM7/30/18
to Hazelcast
Hi,

I am trying to use Hazelcast continous query cache feature. 

My use case is to keep track of total api call count in a distributed system and apply thresholds when required. Each client node periodically inserts its own data to map and periodically receives/queries
data of other nodes. As far as I understand, continuous query cache acts like a local/continuous materialized view of the target map and therefore would fit to my case. 

I have client server topology with hazelcast version set to 3.9. I am using the coalesce feature of continuous query cache on client nodes. I am inserting elements to the map with ttl (typically 30 seconds) . Indexes exist for queried fields.

Here are the problems I encountered and my questions:

1) When I try to use combined Hazelcast predicates (EqualPredicate, GreaterPredicate and etc. combined with and)  for continuous cache query, I frequently saw the below exception in Hazelcast nodes: 

2018-07-27 14:26:38,799 : [ERROR] [hz._hzInstance_1_mycluster.partition-operation.thread-7][SetOperation]  [myhost]:9000 [mycluster] [3.9] null
java.lang.NullPointerException
        at com.hazelcast.query.impl.CachedQueryEntry.getTargetObject(CachedQueryEntry.java:105)
        at com.hazelcast.query.impl.QueryableEntry.extractAttributeValue(QueryableEntry.java:81)
        at com.hazelcast.query.impl.QueryableEntry.getAttributeValue(QueryableEntry.java:48)
        at com.hazelcast.query.impl.predicates.AbstractPredicate.readAttributeValue(AbstractPredicate.java:132)
        at com.hazelcast.query.impl.predicates.AbstractPredicate.apply(AbstractPredicate.java:57)
        at com.hazelcast.query.impl.predicates.NotEqualPredicate.apply(NotEqualPredicate.java:41)
        at com.hazelcast.query.impl.predicates.AndPredicate.apply(AndPredicate.java:129)
        at com.hazelcast.map.impl.query.QueryEventFilter.eval(QueryEventFilter.java:53)
        at com.hazelcast.map.impl.event.AbstractFilteringStrategy.evaluateQueryEventFilter(AbstractFilteringStrategy.java:77)
        at com.hazelcast.map.impl.event.DefaultEntryEventFilteringStrategy.processQueryEventFilter(DefaultEntryEventFilteringStrategy.java:125)
        at com.hazelcast.map.impl.event.DefaultEntryEventFilteringStrategy.doFilter(DefaultEntryEventFilteringStrategy.java:84)
        at com.hazelcast.map.impl.event.QueryCacheEventPublisher.getCQCEventTypeOrNull(QueryCacheEventPublisher.java:164)
        at com.hazelcast.map.impl.event.QueryCacheEventPublisher.convertQueryCacheEventDataOrNull(QueryCacheEventPublisher.java:129)
        at com.hazelcast.map.impl.event.QueryCacheEventPublisher.addEventToQueryCache(QueryCacheEventPublisher.java:92)
        at com.hazelcast.map.impl.event.MapEventPublisherImpl.postPublishEvent(MapEventPublisherImpl.java:224)
        at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:204)
        at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:160)
        at com.hazelcast.map.impl.event.MapEventPublisherImpl.publishEvent(MapEventPublisherImpl.java:148)
        at com.hazelcast.map.impl.operation.BasePutOperation.afterRun(BasePutOperation.java:53)
        at com.hazelcast.map.impl.operation.SetOperation.afterRun(SetOperation.java:40)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.afterRun(OperationRunnerImpl.java:295)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:196)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:406)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:433)
        at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:569)
        at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:554)
        at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:513)
        at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:207)
        at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:60)
        at com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask.processMessage(AbstractPartitionMessageTask.java:67)
        at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage(AbstractMessageTask.java:123)
        at com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run(AbstractMessageTask.java:103)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:154)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:125)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)

This did not make any sense to me. It seems that somehow the entry value is null ? Is this due to expiry? Can this be avoided?

2) After receiving the above exception , I implemented my custom predicate like below where I had to put null check before applying predicate logic

public class MyCustomPredicate implements Predicate<String, MyMapElement>, Serializable {


    
@Override
    public boolean apply(Entry<String, MyMapElement> paramEntry) {
        MyMapElement myMapElement = paramEntry.getValue();
        if (myMapElement == null) {
            return false;
        }
        //MY CUSTOM PREDICATE LOGIC
    }


}


Like above entry vales were sometimes null... That's why I had to put the null check..

But with this implementation, I think that my index config is useless? Is that right? If that's the case, how can I make my custom predicate make use of indexes?

3) Would you recommend any other hazelcast feature for my problem? Another option seemed to me using the aggregations but I am not sure which would perform better. Any suggestions?

4) This will eventually work in production code with around 70 clients (which means 70 distinct continuous queries) with a map size around 50000. Would that be appropriate?

Best regards  

Sertuğ Kaya

unread,
Jul 30, 2018, 6:42:50 AM7/30/18
to Hazelcast
Yes, a CQC is beneficial when you need to query your IMap frequently and fast way. What is the period you need to check for that data and do you need to query based on multiple fields? 
Please keep in mind that key-based operations are expected to be faster than queries by nature, so I'd try to model the application towards that as much as possible. OTOH, 50000 entries is not much so you should be fine either way, given enough resources on the cluster.

We'd need to see the predicate you use (or a reproducer, which is even better) in order to understand the reason for NPE I guess.
As for Custom Predicate, you need to implement it by using the IndexAwarePredicate interface so it leverages indexes, which will definitely help.

Ceyhan Kasap

unread,
Jul 30, 2018, 7:35:09 AM7/30/18
to Hazelcast
Hi,

The query period is 1 second (QueryCacheConfig allows delays at most by seconds as far as I see (QueryCacheConfig.setDelaySeconds() method).

Yes I need to query on  multiple fields.

Unfortunately I need aggregation (I need sum of the values of other nodes) so key based alternative does not seem possible.

I will try to create a reproducer and will have a look at IndexAwarePredicate interface.

By the way, should I use Fast Aggregations instead of cqc? What do you think

30 Temmuz 2018 Pazartesi 13:42:50 UTC+3 tarihinde Sertuğ Kaya yazdı:

Sertuğ Kaya

unread,
Jul 30, 2018, 9:02:15 AM7/30/18
to Hazelcast
Yes, if the query frequence is high, CQC looks more appropriate. 
Feel free to submit an issue with a reproducer.

Best,

Ahmet Mircik

unread,
Jul 30, 2018, 12:36:34 PM7/30/18
to Hazelcast
NPE seems a fixed issue in later versions by starting 3.9.4: https://github.com/hazelcast/hazelcast/issues/10556

--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast/a3447b85-3cfd-4327-8f24-cc882da17c95%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ceyhan Kasap

unread,
Jul 31, 2018, 3:09:14 AM7/31/18
to Hazelcast

Hi Sertug and Ahmet,

I do not think that I will have the chance to change hazelcast version. I think I can live with the custom Predicate. Here are the remaining questions. I would be very pleased if you can suggest:

1) Since I need to get data of other clients from client node via continuous query, in my custom predicate I check not equals condition for entry attribute. This prevents me from using indexes. As far as I can see,  com.hazelcast.query.impl.predicates.NotEqualPredicate 
is not using indexes either? Please correct me if I am wrong.
 
@Override
   
public boolean isIndexed(QueryContext queryContext) {
       
return false;
   
}


   
@Override
   
public Set<QueryableEntry> filter(QueryContext queryContext) {
       
return null;
   
}

     And if I will not be able to make use of index, and all entries will be filtered via apply method of the custom predicate, there is no point in adding index to map since it will just degrade insert performance. Is that right?

2) As I tried to explain in the very beginning, my use case is to keep track of total api call count in a distributed system and apply thresholds when required. Each client node periodically inserts its own data to map and periodically receives/queries
data of other nodes via continuous query. In the client node , after receiving data of other nodes via CQC, I calculate a total use count for each api. I.e,  at every second in each of 70 clients, I am performing an aggregation of other client data.

I thought that this solution requires publishing around 50k data from cluster to each 70 client node and would be ineffificient becasue of serialization/deserialization cost. 

Fast aggregations seemed like another option. Since data is already in cluster, I thought at every second every 70 client node can submit an aggregation request to the underlying map in the cluster via IMap.aggregate() . That way aggregation would be performed on cluster and less data would flow from cluster to each client. But aggregation performance seemed not good compared to CQC. As far as I unserstand, CQC is based on event framework and fast aggregations use some other mechanism I do not know. 

Do you think that I should continue to stick to CQC? Is there any other recommendation.

3)  This is not a question but rather a request, in order to load data fast from client to cluster,  methods like setAllAsync or putAllAsync would be beneficial



30 Temmuz 2018 Pazartesi 19:36:34 UTC+3 tarihinde Ahmet Mircik yazdı:

Ceyhan Kasap

unread,
Aug 1, 2018, 4:54:28 AM8/1/18
to Hazelcast
When I increase client size to 30 I start to receive too many failures..

Does this means that CQC is not scalable?

[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=95, expectedSequence=11 but foundSequence=51, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=51, expectedSequence=11 but foundSequence=42, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-65] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=104, expectedSequence=1 but foundSequence=61, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-65] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=104, expectedSequence=1 but foundSequence=62, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=35, expectedSequence=1 but foundSequence=50, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=35, expectedSequence=1 but foundSequence=51, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=55, expectedSequence=1 but foundSequence=60, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=55, expectedSequence=1 but foundSequence=61, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=75, expectedSequence=1 but foundSequence=91, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=70, expectedSequence=1 but foundSequence=90, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=131, expectedSequence=1 but foundSequence=50, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=11, expectedSequence=1 but foundSequence=71, cacheSize=1894
[2018-08-01 11:49:33.160] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=21, expectedSequence=1 but foundSequence=102, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=91, expectedSequence=1 but foundSequence=57, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-63] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=42, expectedSequence=1 but foundSequence=64, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=91, expectedSequence=1 but foundSequence=58, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-63] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=127, expectedSequence=1 but foundSequence=71, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=86, expectedSequence=12 but foundSequence=71, cacheSize=1894
[2018-08-01 11:49:33.165] - 25668 WARNING [hz.client_12.event-62] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=1, expectedSequence=1 but foundSequence=105, cacheSize=1894
[2018-08-01 11:49:33.166] - 25668 WARNING [hz.client_12.event-63] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=62, expectedSequence=1 but foundSequence=67, cacheSize=1894
[2018-08-01 11:49:33.166] - 25668 WARNING [hz.client_12.event-63] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=62, expectedSequence=1 but foundSequence=68, cacheSize=1894
[2018-08-01 11:49:33.166] - 25668 WARNING [hz.client_12.event-63] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=62, expectedSequence=1 but foundSequence=69, cacheSize=1894
[2018-08-01 11:49:33.166] - 25668 WARNING [hz.client_12.event-61] --- com.hazelcast.map.impl.querycache.subscriber.SubscriberAccumulator: Event lost detected for partitionId=15, expectedSequence=7 but foundSequence=26, cacheSize=1894

31 Temmuz 2018 Salı 10:09:14 UTC+3 tarihinde Ceyhan Kasap yazdı:

Ceyhan Kasap

unread,
Aug 2, 2018, 9:02:05 AM8/2/18
to Hazelcast
No matter what I do, when I ıncrease client size CQC fails. Aggregation has worse performance..

1 Ağustos 2018 Çarşamba 11:54:28 UTC+3 tarihinde Ceyhan Kasap yazdı:

Ahmet Mircik

unread,
Aug 2, 2018, 9:40:20 AM8/2/18
to Hazelcast
Please try to use latest versions like 3.9.4 or 3.10.4, lots of query-cache related fixes were done in them.

Reply all
Reply to author
Forward
0 new messages