Java 8 parallel stream on ResultSet

1,976 views
Skip to first unread message

Vlastimil Dolejš

unread,
Oct 20, 2015, 12:50:08 PM10/20/15
to cqengine-discuss
Hi,
We have used parallel stream on ResultSet returned from cqengine and got some strange data.
We have tried the same query on the same data but processed the ResultSet with regular single-threaded stream - data were different.

Code that we use:
StreamSupport.stream(resultSet.spliterator(), true).collect(groupingBy(value -> discriminator(value)))

I have not found anything about parallel streams in docs / javadoc. Just this qoutation on forum:

CQEngine ResultSets are not designed to be iterated by more than one thread in parallel (but of course multiple threads may be iterating different resultsets in parallel). So when setting up your parallel streams, ensure that the stream is parallelized after a single threaded stage which iterates the ResultSet. That is a single reader, feeds objects to a pool of multiple worker threads.

I assume that using parallel stream directly on ResultSet is prohibited?

Is there any way to force ResultSet to use single-threaded stream even if user requests parallel stream? Or throw exception?

Thank you

Vlastimil

Niall

unread,
Oct 20, 2015, 6:45:10 PM10/20/15
to cqengine-discuss

Hi Vlastimil,


Good question! Yes that’s correct.


The only supported way to create a stream is to follow the example on the CQEngine site here where you supply parallel=false:

public static <O> Stream<O> asStream(ResultSet<O> rs) {
   
return StreamSupport.stream(rs.spliterator(), false);
}


Creating a stream where you supply parallel=true is not supported. This isn't a limitation per-se, it's because ResultSet represents a lazy computation and so it is not inherently splittable as discussed in StreamParallelGuidance. If the application needs to process objects which match a query in parallel, it needs to split the stream externally as CQEngine will not split it internally.


The current situation (as you describe) is a result of Java 6 compatibility. At the moment CQEngine 2.1 retains compatibility with Java 6, and so when run on Java 8, the behaviour of CQEngine's ResultSet.spliterator() method is actually inherited from the default implementation in java.lang.Iterable.


I'm not sure what the rationale was, but AFAICS the default implementation of java.util.Iterator.spliterator().trySplit() in Java 8 does not return null, which typically facilitates parallel operation. So unfortunately although using a parallel stream is not supported, unless we break compatibility with Java 6, we can't override that default behaviour.


The next major version of CQEngine 3.x will probably target Java 8. At that point ResultSet could redefine spliterator() and also implement a well-behaved stream() method directly, to override the default behaviour.


AFAICS you have two options in the meantime:

(1) Just don't do that! - if you create a stream using parallel=false, then this problem will not occur, or

(2) If you really need to pass a CQEngine ResultSet to some "untrusted code" (a user), as a workaround you could extend CQEngine's WrappedResultSet in your application, and override the spliterator() method in the wrapper so that it prevents the unsupported usage, for example such that Spliterator.trySplit() returns null. Then give the wrapper to the untrusted code, which could enforce correct usage.


The current situation is not ideal, but I don't know how it can be improved without losing Java 6 compatibility :/

Ideas are welcome!


HTH,

Niall

Vlastimil Dolejš

unread,
Oct 21, 2015, 8:11:16 AM10/21/15
to cqengine-discuss
Hi Niall,
Thank you for quick and detailed response!

I choosed the solution with WrappedResultSet and custom spliterator. This solution works and seems more safe to me. I can't imagine more elegant solution without losing Java 6 compatibility.

What I don't understand is why the default implementation of spliterator() don't work. I tried to debug the code and as I understood - the default Spliterator (IteratorSpliterator) reads values from iterator() and split them to arrays. These arrays are then processed in parallel. So reading values from iterator should happen in single thread.
The fork-join / stream code is hard to understand for me, so maybe I missed something.

I'am looking forward for 3.x version. :)
Great work!

Vlastimil


Dne středa 21. října 2015 0:45:10 UTC+2 Niall napsal(a):
Reply all
Reply to author
Forward
0 new messages