@Test
|
public void pipeData() throws IOException, InterruptedException {
|
final CountDownLatch latch = new CountDownLatch(10);
|
NioChannelHandler.PipeReader pipePing = new NioChannelHandler.PipeReader() {
|
int count = 1;
|
private ByteBuffer buffer = ByteBuffer.allocate(4);
|
|
@Override
|
protected void onData(Pipe.SourceChannel source) {
|
try {
|
source.read(buffer);
|
if (buffer.position() == 4) {
|
buffer.flip();
|
assertEquals(count, buffer.getInt());
|
count++;
|
buffer.clear();
|
latch.countDown();
|
}
|
} catch (IOException e) {
|
throw new RuntimeException(e);
|
}
|
}
|
};
|
NioFiber fiber = new NioFiberImpl();
|
fiber.addHandler(pipePing);
|
fiber.start();
|
|
final Pipe.SinkChannel sink = pipePing.getSink();
|
for (int i = 1; i < 11; i++) {
|
ByteBuffer b = ByteBuffer.allocateDirect(4);
|
b.putInt(i);
|
b.flip();
|
while (b.hasRemaining()) {
|
sink.write(b);
|
}
|
}
|
|
final boolean await = latch.await(10, TimeUnit.SECONDS);
|
assertTrue(await);
|
fiber.dispose();
|
} |