Trying to get an inputstream implementation that get's it's stream from a ReadStream<Buffer>.The resulting stream doesn't have the correct byte order. Any suggestions?
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReadInputStream extends InputStream {
private AtomicBoolean readStreamFinished;
private AtomicBoolean readStreamPaused;
private int bufCounter = 0;
private static final int MAX_QUEUE = 32768;
private BlockingDeque<Integer> activeQ;
private BlockingDeque<Integer> stoppedBuffer;
private ReadStream<Buffer> inputStream;
public static ReadInputStream getInstance(ReadStream<Buffer> inputStream) {
return InnderReadInputStream.getInstance(inputStream);
}
private ReadInputStream(ReadStream<Buffer> inputStream) {
this.readStreamPaused = new AtomicBoolean(true);
this.readStreamFinished = new AtomicBoolean(false);
this.activeQ = new LinkedBlockingDeque<Integer>(MAX_QUEUE);
this.stoppedBuffer = new LinkedBlockingDeque<Integer>();
this.inputStream = inputStream;
/**
* for each buffer chunk handled check if the entire chunk will fit in the queue
* if it doesn't stop the stream and save the chunk in memory temporarily so that read()
* can process it when it runs out of queue entries and resume the stream
*
* TODO - change this to check queue size on each byte
*/
this.inputStream.handler(handleBuffer -> {
bufCounter = handleBuffer.length();
while (bufCounter > 0) {
bufCounter--;
if (activeQ.remainingCapacity() == 0) {
stoppedBuffer.offerFirst(Byte.toUnsignedInt(handleBuffer.getByte(bufCounter)));
stop();
} else {
activeQ.offerFirst(Byte.toUnsignedInt(handleBuffer.getByte(bufCounter)));
}
}
});
this.inputStream.endHandler(endHandle -> {
readStreamFinished.set(true);
});
}
public ReadInputStream start() {
this.inputStream.resume();
this.readStreamPaused.set(false);
return this;
}
public ReadInputStream stop() {
this.inputStream.pause();
this.readStreamPaused.set(true);
return this;
}
private int readInt() throws IOException, InterruptedException {
synchronized (this) {
if (readStreamFinished.get()) {
//Stream finished check if activeQ and pausedBuffer still have bytes
if (stoppedBuffer.isEmpty() && activeQ.isEmpty()) {
//if activeQ and stopped buffer are also empty finish by return -1
return -1;
} else {
if (stoppedBuffer.isEmpty()) {
//activeQ still has bytes
return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
} else {
//Stopped buffer still has bytes
return stoppedBuffer.pollLast();
}
}
} else {
//Stream hasn't finished yet
if (readStreamPaused.get()) {
if (activeQ.isEmpty()) {
//activeQ is empty, start draining stoppedBuffer
if (stoppedBuffer.isEmpty()) {
//stoppedBuffer is drained, restart stream
start();
return activeQ.takeLast();
} else {
//stoppedBuffer contains bytes, poll it
return stoppedBuffer.pollLast();
}
} else {
return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
}
} else {
if (activeQ.isEmpty()) {
//activeQ is empty, start draining stoppedBuffer
if (stoppedBuffer.isEmpty()) {
//stoppedBuffer is drained, restart stream
start();
return activeQ.takeLast();
} else {
//stoppedBuffer contains bytes, poll it
return stoppedBuffer.pollLast();
}
} else {
return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
}
}
}
}
}
// //Stream overrides to copy the data
@Override
public int read() throws IOException {
synchronized (this) {
int result = 0;
try {
result = readInt();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
@Override
public void close() throws IOException {
stop();
super.close();
}
@Override
public int available() throws IOException {
return 0;
}
private static class InnderReadInputStream {
static ReadInputStream INSTANCE;
public static ReadInputStream getInstance(ReadStream<Buffer> inputstream) {
if (INSTANCE == null) {
INSTANCE = new ReadInputStream(inputstream);
}
return INSTANCE;
}
}
}
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReadInputStream extends InputStream {
private AtomicBoolean readStreamFinished;
private AtomicBoolean readStreamPaused;
private static final int MAX_QUEUE = 32768;
private BlockingDeque<Byte> activeQ;
private BlockingDeque<Byte> stoppedBuffer;
private ReadStream<Buffer> inputStream;
public static ReadInputStream getInstance(ReadStream<Buffer> inputStream) {
return InnderReadInputStream.getInstance(inputStream);
}
private ReadInputStream(ReadStream<Buffer> inputStream) {
this.readStreamPaused = new AtomicBoolean(true);
this.readStreamFinished = new AtomicBoolean(false);
this.activeQ = new LinkedBlockingDeque<Byte>(MAX_QUEUE);
this.stoppedBuffer = new LinkedBlockingDeque<Byte>();
this.inputStream = inputStream;
this.inputStream.handler(handleBuffer -> {
int index = 0;
while (index < handleBuffer.length()) {
if (activeQ.remainingCapacity() == 0) {
stoppedBuffer.offerFirst(handleBuffer.getByte(index));
stop();
} else {
activeQ.offerFirst(handleBuffer.getByte(index));
}
index++;
}
});
this.inputStream.endHandler(endHandle -> {
readStreamFinished.set(true);
});
}
public ReadInputStream start() {
this.inputStream.resume();
this.readStreamPaused.set(false);
return this;
}
public ReadInputStream stop() {
this.inputStream.pause();
this.readStreamPaused.set(true);
return this;
}
private int readInt() throws IOException, InterruptedException {
Byte b = null;
if (readStreamFinished.get()) {
//Stream finished check if activeQ and pausedBuffer still have bytes
if (stoppedBuffer.isEmpty() && activeQ.isEmpty()) {
//if activeQ and stopped buffer are also empty finish by return -1
b = null;
} else {
if (stoppedBuffer.isEmpty()) {
//activeQ still has bytes
b = activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
} else {
//Stopped buffer still has bytes
b = stoppedBuffer.pollLast();
}
}
} else {
//Stream hasn't finished yet
if (readStreamPaused.get()) {
if (activeQ.isEmpty()) {
//activeQ is empty, start draining stoppedBuffer
if (stoppedBuffer.isEmpty()) {
//stoppedBuffer is drained, restart stream
start();
b = activeQ.takeLast();
} else {
//stoppedBuffer contains bytes, poll it
b = stoppedBuffer.pollLast();
}
} else {
b = activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
}
} else {
if (activeQ.isEmpty()) {
//activeQ is empty, start draining stoppedBuffer
if (stoppedBuffer.isEmpty()) {
//stoppedBuffer is drained, restart stream
start();
b = activeQ.takeLast();
} else {
//stoppedBuffer contains bytes, poll it
b = stoppedBuffer.pollLast();
}
} else {
b = activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
}
}
}
return b == null ? -1 : Byte.toUnsignedInt(b);
}
//Stream overrides to copy the data
@Override
public int read() throws IOException {
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.WriteStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* Created by bruno on 19/10/15.
*/
public class OutputWriterStream extends OutputStream {
private final WriteStream<Buffer> response;
private final byte[] buffer;
private io.vertx.core.buffer.Buffer vertxBuffer;
private int counter = 0;
public OutputWriterStream(final WriteStream<Buffer> response) {
this.response = response;
buffer = new byte[4096];
vertxBuffer = io.vertx.core.buffer.Buffer.buffer();
}
@Override
public synchronized void write(final int b) throws IOException {
buffer[counter++] = (byte) b;
if (counter >= buffer.length) {
flush();
}
}
@Override
public synchronized void flush() throws IOException {
super.flush();
if (counter > 0) {
byte[] remaining = buffer;
if (counter < buffer.length) {
remaining = new byte[counter];
System.arraycopy(buffer, 0, remaining, 0, counter);
}
vertxBuffer.appendBytes(remaining);
response.write(Buffer.newInstance(vertxBuffer));
}
counter = 0;
vertxBuffer = io.vertx.core.buffer.Buffer.buffer();
}
@Override
public synchronized void close() throws IOException {
flush();
}
}
...
...