Re: Has anyone created a Inputstream wrapper for a Readstream<Buffer>? I'm having issues with byte order

1,279 views
Skip to first unread message
Message has been deleted

Nat

unread,
Nov 5, 2015, 11:28:11 AM11/5/15
to vert.x
read() function in InputStream is supposed to read one byte at a time.
http://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html#read()

On Thursday, November 5, 2015 at 8:19:02 AM UTC-8, bvil...@northplains.com wrote:
Trying to get an inputstream implementation that get's it's stream from a ReadStream<Buffer>.

The resulting stream doesn't have the correct byte order. Any suggestions?


import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.ReadStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class ReadInputStream extends InputStream {
   
private AtomicBoolean readStreamFinished;
   
private AtomicBoolean readStreamPaused;
   
private int bufCounter = 0;
   
private static final int MAX_QUEUE = 32768;
    private BlockingDeque<Integer> activeQ;
   
private BlockingDeque<Integer> stoppedBuffer;
   
private ReadStream<Buffer> inputStream;

    public static ReadInputStream getInstance(ReadStream<Buffer> inputStream) {
       
return InnderReadInputStream.getInstance(inputStream);
   
}

   
private ReadInputStream(ReadStream<Buffer> inputStream) {
       
this.readStreamPaused = new AtomicBoolean(true);
       
this.readStreamFinished = new AtomicBoolean(false);
       
this.activeQ = new LinkedBlockingDeque<Integer>(MAX_QUEUE);
       
this.stoppedBuffer = new LinkedBlockingDeque<Integer>();
       
this.inputStream = inputStream;

       
/**
         * for each buffer chunk handled check if the entire chunk will fit in the queue
         * if it doesn't stop the stream and save the chunk in memory temporarily so that read()
         * can process it when it runs out of queue entries and resume the stream
         *
         * TODO - change this to check queue size on each byte
         */
        this.inputStream.handler(handleBuffer -> {
               
bufCounter = handleBuffer.length();
               
while (bufCounter > 0) {
                   
bufCounter--;
                   
if (activeQ.remainingCapacity() == 0) {
                       
stoppedBuffer.offerFirst(Byte.toUnsignedInt(handleBuffer.getByte(bufCounter)));
                        stop
();
                   
} else {
                       
activeQ.offerFirst(Byte.toUnsignedInt(handleBuffer.getByte(bufCounter)));
                   
}
               
}
       
});

       
this.inputStream.endHandler(endHandle -> {
           
readStreamFinished.set(true);
       
});
   
}

   
public ReadInputStream start() {
       
this.inputStream.resume();
       
this.readStreamPaused.set(false);
       
return this;
   
}

   
public ReadInputStream stop() {
       
this.inputStream.pause();
       
this.readStreamPaused.set(true);
       
return this;
   
}

   
private int readInt() throws IOException, InterruptedException {
       
synchronized (this) {
           
if (readStreamFinished.get()) {
               
//Stream finished check if activeQ and pausedBuffer still have bytes
                if (stoppedBuffer.isEmpty() && activeQ.isEmpty()) {
                   
//if activeQ and stopped buffer are also empty finish by return -1
                    return -1;
               
} else {
                   
if (stoppedBuffer.isEmpty()) {
                       
//activeQ still has bytes
                        return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
                   
} else {
                       
//Stopped buffer still has bytes
                        return stoppedBuffer.pollLast();
                   
}
               
}
           
} else {
               
//Stream hasn't finished yet
                if (readStreamPaused.get()) {
                   
if (activeQ.isEmpty()) {
                       
//activeQ is empty, start draining stoppedBuffer
                        if (stoppedBuffer.isEmpty()) {
                           
//stoppedBuffer is drained, restart stream
                            start();
                           
return activeQ.takeLast();
                       
} else {
                           
//stoppedBuffer contains bytes, poll it
                            return stoppedBuffer.pollLast();
                       
}
                   
} else {
                       
return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
                   
}
               
} else {
                   
if (activeQ.isEmpty()) {
                       
//activeQ is empty, start draining stoppedBuffer
                        if (stoppedBuffer.isEmpty()) {
                           
//stoppedBuffer is drained, restart stream
                            start();
                           
return activeQ.takeLast();
                       
} else {
                           
//stoppedBuffer contains bytes, poll it
                            return stoppedBuffer.pollLast();
                       
}
                   
} else {
                       
return activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
                   
}
               
}
           
}
       
}
   
}

   
//    //Stream overrides to copy the data
    @Override
    public int read() throws IOException {
       
synchronized (this) {
           
int result = 0;
           
try {
                result
= readInt();
           
} catch (InterruptedException e) {
                e
.printStackTrace();
           
}
           
return result;
       
}
   
}

   
@Override
    public void close() throws IOException {
        stop
();
       
super.close();
   
}

   
@Override
    public int available() throws IOException {
       
return 0;
   
}

   
private static class InnderReadInputStream {
       
static ReadInputStream INSTANCE;

       
public static ReadInputStream getInstance(ReadStream<Buffer> inputstream) {
           
if (INSTANCE == null) {
               
INSTANCE = new ReadInputStream(inputstream);
           
}
           
return INSTANCE;
       
}
   
}
}



Message has been deleted

bvil...@northplains.com

unread,
Nov 5, 2015, 12:09:24 PM11/5/15
to vert.x
Here is a working version...

import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.ReadStream;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;

public class ReadInputStream extends InputStream {
   
private AtomicBoolean readStreamFinished;
   
private AtomicBoolean readStreamPaused;

   
private static final int MAX_QUEUE = 32768;

   
private BlockingDeque<Byte> activeQ;
   
private BlockingDeque<Byte> stoppedBuffer;

   
private ReadStream<Buffer> inputStream;

   
public static ReadInputStream getInstance(ReadStream<Buffer> inputStream) {
       
return InnderReadInputStream.getInstance(inputStream);
   
}

   
private ReadInputStream(ReadStream<Buffer> inputStream) {
       
this.readStreamPaused = new AtomicBoolean(true);
       
this.readStreamFinished = new AtomicBoolean(false);

       
this.activeQ = new LinkedBlockingDeque<Byte>(MAX_QUEUE);
       
this.stoppedBuffer = new LinkedBlockingDeque<Byte>();
       
this.inputStream = inputStream;

       
this.inputStream.handler(handleBuffer -> {
               
int index = 0;
               
while (index < handleBuffer.length()) {
                   
if (activeQ.remainingCapacity() == 0) {
                       
stoppedBuffer.offerFirst(handleBuffer.getByte(index));
                        stop
();
                   
} else {
                       
activeQ.offerFirst(handleBuffer.getByte(index));
                   
}
                    index
++;

               
}
       
});

       
this.inputStream.endHandler(endHandle -> {
           
readStreamFinished.set(true);
       
});
   
}

   
public ReadInputStream start() {
       
this.inputStream.resume();
       
this.readStreamPaused.set(false);
       
return this;
   
}

   
public ReadInputStream stop() {
       
this.inputStream.pause();
       
this.readStreamPaused.set(true);
       
return this;
   
}

   
private int readInt() throws IOException, InterruptedException {

       
Byte b = null;

       
if (readStreamFinished.get()) {
           
//Stream finished check if activeQ and pausedBuffer still have bytes
            if (stoppedBuffer.isEmpty() && activeQ.isEmpty()) {
               
//if activeQ and stopped buffer are also empty finish by return -1
                b = null;

           
} else {
               
if (stoppedBuffer.isEmpty()) {
                   
//activeQ still has bytes
                    b = activeQ.pollLast(5000, TimeUnit.MILLISECONDS);

               
} else {
                   
//Stopped buffer still has bytes
                    b = stoppedBuffer.pollLast();

               
}
           
}
       
} else {
           
//Stream hasn't finished yet
            if (readStreamPaused.get()) {
               
if (activeQ.isEmpty()) {
                   
//activeQ is empty, start draining stoppedBuffer
                    if (stoppedBuffer.isEmpty()) {
                       
//stoppedBuffer is drained, restart stream
                        start();

                        b
= activeQ.takeLast();

                   
} else {
                       
//stoppedBuffer contains bytes, poll it
                        b = stoppedBuffer.pollLast();
                   
}
               
} else {
                    b
= activeQ.pollLast(5000, TimeUnit.MILLISECONDS);

               
}
           
} else {
               
if (activeQ.isEmpty()) {
                   
//activeQ is empty, start draining stoppedBuffer
                    if (stoppedBuffer.isEmpty()) {
                       
//stoppedBuffer is drained, restart stream
                        start();

                        b
= activeQ.takeLast();

                   
} else {
                       
//stoppedBuffer contains bytes, poll it
                        b = stoppedBuffer.pollLast();
                   
}
               
} else {
                    b
= activeQ.pollLast(5000, TimeUnit.MILLISECONDS);
               
}
           
}
       
}
       
return b == null ? -1 : Byte.toUnsignedInt(b);

   
}

   
//Stream overrides to copy the data
    @Override
    public int read() throws IOException {

bvil...@northplains.com

unread,
Nov 5, 2015, 12:27:26 PM11/5/15
to vert.x
Here's one also for WriterStreams and Outputstreams...

import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

/**
 * Created by bruno on 19/10/15.
 */
public class OutputWriterStream extends OutputStream {
   
private final WriteStream<Buffer> response;
   
private final byte[] buffer;
   
private io.vertx.core.buffer.Buffer vertxBuffer;
   
private int counter = 0;

   
public OutputWriterStream(final WriteStream<Buffer> response) {
       
this.response = response;
       
buffer = new byte[4096];
       
vertxBuffer = io.vertx.core.buffer.Buffer.buffer();
   
}

   
@Override
    public synchronized void write(final int b) throws IOException {
       
buffer[counter++] = (byte) b;
       
if (counter >= buffer.length) {
            flush
();
       
}
   
}

   
@Override
    public synchronized void flush() throws IOException {
       
super.flush();
       
if (counter > 0) {
           
byte[] remaining = buffer;
           
if (counter < buffer.length) {
                remaining
= new byte[counter];
               
System.arraycopy(buffer, 0, remaining, 0, counter);
           
}
           
vertxBuffer.appendBytes(remaining);
           
response.write(Buffer.newInstance(vertxBuffer));
       
}
       
counter = 0;
       
vertxBuffer = io.vertx.core.buffer.Buffer.buffer();
   
}

   
@Override
    public synchronized void close() throws IOException {
        flush
();
   
}
}


Dom B

unread,
Jan 25, 2016, 4:48:46 AM1/25/16
to vert.x
Hey guys,

sorry for pulling up this old issue again, but I'm looking for help implementing the above wrapper. 
My scenario is that I want to process a huge JSON File within an HTTP server and pass the ReadStream<Buffer> to the Jackson Streaming API, which expects a JavaIO Inputstream.
I exactly adapted the code example from above, excepted that i import the Buffer and Readstream of io.vertx.core instead of the rx modul.

 Now my problem is that my code is stuck and doesn't read anything from a AsyncFile, whereas Debuging says the file could be opened.

vertx.executeBlocking(things -> {
vertx.fileSystem().open("tmp/testdump.json", new OpenOptions(), file -> {
receive_dump_stream(file.result());
things.complete();
});
}, onFinished -> System.out.println("done"));

____________________________________________________________

public void receive_dump_stream(ReadStream<Buffer> readstream){
ReadInputStream ris = ReadInputStream.getInstance(readstream);
JsonFactory fac  = new JsonFactory();
try {
JsonParser parser = fac.createParser(ris);
//work with the stream
ris.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
...

Dom B

unread,
Jan 25, 2016, 6:09:21 AM1/25/16
to vert.x
short follow up on my own question.
the issue could be resolved by executing my method "receive_dump_stream", which is operating with the ReadInputStream wrapper, blocking (meaning vertx.executeBlocking) as well.

Cheers,
Dom
...
Reply all
Reply to author
Forward
0 new messages