Reactor gets stuck after Ring Buffer fills

39 views
Skip to first unread message

Paul Houle

unread,
Feb 15, 2016, 12:24:29 PM2/15/16
to reactor-framework
I have been working on an application where data gets pushed around with Reactor 2.0.7 but I am running into problems.  Here is a test case where I run into trouble.

@Test
public void firstPipeline() throws Throwable {
long milliStart=System.currentTimeMillis();
Environment.initialize();
Stream<Model> stream=Streams
.from(new CLEIIterable(createDefaultModel())).take(10000);

FileUtils.deleteDirectory(DIRECTORY);
Dataset ds= TDBFactory.createDataset(DIRECTORY.toString());
Model lei=ds.getDefaultModel();
Processor<Model,Model> p=RingBufferProcessor.create(false);
Stream<Model> dsStream=Streams.wrap(p);
stream.subscribe(p);
dsStream.consume(m->lei.add(m));
Streams.await(dsStream);
System.out.println("waiting on topic processor");
System.out.println("Found "+lei.size()+" triples");
Thread.sleep(1000);
ds.close();
long milliClose=System.currentTimeMillis();
System.out.println("Run time is "+(milliClose-milliStart)/1000.0);
}

The CLEIIterable would return about 400,000 results,  but we are taking 10,000 of them so the test won't take so long to run.

I want to have a RingBuffer ahead of the consumer that puts data into the lei model because the dataset associated with that model is not thread-safe,  longer term there will be more data sources sending messages there and that is how I want to manage the thread issues.  That is,  ultimately I want that RingBuffer packaged together with the consumer in the source code and then wire up multiple producers to that as well as some other consumers.

After messing around the code on the weekend I have the code sample above that doesn't quite work.  When I run this example,  I find that the system pulls 32 items from the CLEIIterable,  then it hangs up for 30 seconds in the await.  The Thread.sleep(1000) is there because in various variations of this I have tried,  without it,  it is common for the ds.close() operation to happen concurrently with the consumer,  which causes ds.close() to throw an exception (and probably a corrupted database.)

I have had some variations I have tried that have managed to send more events,  but at this point in time I am having a tricky time making this work.  I am even more concerned about what happens when this system gets generalized,  because I want to generate pipelines programatically.  Looking at the documentation and with the experimentation it is not so clear to me how to make a data pipeline complete working and also close all the resources at the end that have to be closed.  (i.e. if I don't close the TDB I might as well not put anything in it because it will be corrupted.)

What should I do?


Reply all
Reply to author
Forward
0 new messages