StreamSupport.stream(resultSet.spliterator(), true).collect(groupingBy(value -> discriminator(value)))
CQEngine ResultSets are not designed to be iterated by more than one thread in parallel (but of course multiple threads may be iterating different resultsets in parallel). So when setting up your parallel streams, ensure that the stream is parallelized after a single threaded stage which iterates the ResultSet. That is a single reader, feeds objects to a pool of multiple worker threads.
Hi Vlastimil,
Good question! Yes that’s correct.
The only supported way to create a stream is to follow the example on the CQEngine site here where you supply parallel=false:
public static <O> Stream<O> asStream(ResultSet<O> rs) {
return StreamSupport.stream(rs.spliterator(), false);
}
Creating a stream where you supply parallel=true is not supported. This isn't a limitation per-se, it's because ResultSet represents a lazy computation and so it is not inherently splittable as discussed in StreamParallelGuidance. If the application needs to process objects which match a query in parallel, it needs to split the stream externally as CQEngine will not split it internally.
The current situation (as you describe) is a result of Java 6 compatibility. At the moment CQEngine 2.1 retains compatibility with Java 6, and so when run on Java 8, the behaviour of CQEngine's ResultSet.spliterator() method is actually inherited from the default implementation in java.lang.Iterable.
I'm not sure what the rationale was, but AFAICS the default implementation of java.util.Iterator.spliterator().trySplit() in Java 8 does not return null, which typically facilitates parallel operation. So unfortunately although using a parallel stream is not supported, unless we break compatibility with Java 6, we can't override that default behaviour.
The next major version of CQEngine 3.x will probably target Java 8. At that point ResultSet could redefine spliterator() and also implement a well-behaved stream() method directly, to override the default behaviour.
AFAICS you have two options in the meantime:
(1) Just don't do that! - if you create a stream using parallel=false, then this problem will not occur, or
(2) If you really need to pass a CQEngine ResultSet to some "untrusted code" (a user), as a workaround you could extend CQEngine's WrappedResultSet in your application, and override the spliterator() method in the wrapper so that it prevents the unsupported usage, for example such that Spliterator.trySplit() returns null. Then give the wrapper to the untrusted code, which could enforce correct usage.
The current situation is not ideal, but I don't know how it can be improved without losing Java 6 compatibility :/
Ideas are welcome!
HTH,
Niall