RestEasy reactive with javax.ws.rs.core.StreamingOutput

1,370 views
Skip to first unread message

swidersk...@gmail.com

unread,
Sep 14, 2021, 4:26:48 PM9/14/21
to Quarkus Development mailing list
Hello,

While working on moving to rest easy reactive I ran into an issue with using javax.ws.rs.core.StreamingOutput to stream back a content of the response.

What is kind of surprising that the response code is 500 but there is nothing logged so no idea what might be missing. Or maybe this is not supported at all.

Here isa bit of code to illustrate how it is used


StreamingOutput entity = new ZipStreamingOutput(archived);
ResponseBuilder builder = Response.ok().entity(entity);

return builder.header("Content-Type", "application/zip").header("Content-Disposition",
        "attachment; filename=" + archived.getId() + ".zip").build();

And the ZipStreamOutput class looks like that

protected class ZipStreamingOutput implements StreamingOutput {

    private ArchivedItem archived;

    public ZipStreamingOutput(ArchivedItem archived) {
        this.archived = archived;
    }

    @Override
    public void write(OutputStream output) throws IOException, WebApplicationException {
        archived.writeAsZip(output);
    }
}

This works ok on rest easy classic and not sure I can do anything to workaround this, if you have any hints I’d appreciate.

Maciej



Paul Carter-Brown

unread,
Sep 14, 2021, 7:21:23 PM9/14/21
to swidersk...@gmail.com, Quarkus Development mailing list
Hi,

StreamingOutput is not supported currently on Resteasy Reactive. You can instead return a Multi ( I assume Multi<byte[]> should work ) and have ArchivedItem emit to the Multi as its writing as zip


--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/A1CC6569-BF84-4CA1-A35F-38ABDDDABBA9%40gmail.com.

Stephane Epardaud

unread,
Sep 15, 2021, 4:26:10 AM9/15/21
to Paul Carter-Brown, swidersk...@gmail.com, Quarkus Development mailing list
What is this ArchivedItem type? This sounds like a blocking write in any case, so you'll need to do the writing on a worker thread with `@Blocking`.



--
Stéphane Épardaud

Stephane Epardaud

unread,
Sep 15, 2021, 5:00:01 AM9/15/21
to Quarkus Development mailing list, Maciej Swiderski
That's an interesting use-case. Given that you have all the bytes in memory, what's the rationale for not zipping them in memory too and returning a byte[] in the first place?
I'd assume that the zipped bytes would take less memory than the non-zipped ones, no?
In any case, I wonder if that end-point should be `@Blocking` or not. This is probably CPU-intensive and so might be better off on the worker thread, but I guess it depends if the CPU utilisation costs more than the offload to the worker pool. I guess you'd have to measure that, and also consider that doing compression on the IO thread might impact other endpoints that are not doing this compression (if you have more than one).

On Wed, 15 Sept 2021 at 10:32, Maciej Swiderski <swidersk...@gmail.com> wrote:
The ArchiveItem is a collection of file data already loaded (byte[]) so it is about to build a zip archive to deliver to the client. the writeToZip file creates zip entries and push them via ZipOutputStream.

Maciej


--
Stéphane Épardaud

Paul Carter-Brown

unread,
Sep 15, 2021, 5:06:09 AM9/15/21
to Stephane Epardaud, Quarkus Development mailing list, Maciej Swiderski
I'll do a quick POC of this and paste the code


Maciej Swiderski

unread,
Sep 15, 2021, 5:44:18 AM9/15/21
to Stephane Epardaud, Quarkus Development mailing list
that's exactly what I rework it to. Generate the zip as byte[] and push it to the client. and yes, it is marked as blocking.

The use case is to deliver selected content as one zip archive to the client so zipping it was like most natural approach.

Maciej

Maciej Swiderski

unread,
Sep 15, 2021, 5:44:29 AM9/15/21
to Stephane Epardaud, Quarkus Development mailing list
thanks a lot Paul!

Paul Carter-Brown

unread,
Sep 15, 2021, 6:08:27 AM9/15/21
to Maciej Swiderski, Stephane Epardaud, Quarkus Development mailing list
An example of slowly replying with binary data in a stream using Resteasy reactive:

    @GET
    @Path("/multi-stream")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Multi<byte[]> getSomeBinary(@QueryParam("approxBytes") int approxBytes) {
        log.error("In getMulti [{}]", approxBytes);
        Multi<byte[]> res = Multi.createFrom().emitter(emitter -> {
            Thread t = new Thread(() -> {
                log.error("Starting emitting data");
                int sent = 0;
                do {
                    byte[] data = getSomeSlowData();
                    sent += data.length;
                    emitter.emit(data);
                    log.error("Sent another [{}] bytes totalling [{}]", data.length, sent);
                } while (sent < approxBytes);
                emitter.complete();
                log.error("Finished emitting");
            });
            t.setName("data-emitter");
            t.start();
        });
        log.error("Out getMulti");
        return res;
    }

    private byte[] getSomeSlowData() {
        Random r = new Random();
        byte[] someData = new byte[10240];
        r.nextBytes(someData);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            log.warn("Error:", e);
        }
        return someData;
    }

Your threading implementation in the emitter could use a pool or whatever makes sense. I'm just showing that the eventloop does the call but another thread that is free to block can be the one feeding the data back. Here are the logs showing what threads are involved:

2021-09-15 10:07:19,539 ERROR [gur.jin.ser.ser.res.ScenariosResource] (vert.x-eventloop-thread-19) In getMulti [100000]
2021-09-15 10:07:19,539 ERROR [gur.jin.ser.ser.res.ScenariosResource] (vert.x-eventloop-thread-19) Out getMulti
2021-09-15 10:07:19,540 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Starting emitting data
2021-09-15 10:07:20,541 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [10240]
2021-09-15 10:07:21,542 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [20480]
2021-09-15 10:07:22,543 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [30720]
2021-09-15 10:07:23,544 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [40960]
2021-09-15 10:07:24,546 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [51200]
2021-09-15 10:07:25,547 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [61440]
2021-09-15 10:07:26,548 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [71680]
2021-09-15 10:07:27,550 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [81920]
2021-09-15 10:07:28,551 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [92160]
2021-09-15 10:07:29,552 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Sent another [10240] bytes totalling [102400]
2021-09-15 10:07:29,553 ERROR [gur.jin.ser.ser.res.ScenariosResource] (data-emitter) Finished emitting


Maciej Swiderski

unread,
Sep 15, 2021, 8:28:51 AM9/15/21
to Paul Carter-Brown, Stephane Epardaud, Quarkus Development mailing list
Thanks Paul,

it is bit different than what I need but certainly helps to better understand how to use Multi

Maciej

Stephane Epardaud

unread,
Sep 15, 2021, 9:33:01 AM9/15/21
to Maciej Swiderski, Paul Carter-Brown, Quarkus Development mailing list
Thanks Paul. However I recommend using the Quarkus worker pool if you want to do that:

    @Inject
    ManagedExecutor workerExecutor;


    @GET
    @Path("/multi-stream")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Multi<byte[]> getSomeBinary(@RestQuery int approxBytes) {

        log.error("In getMulti [{}]", approxBytes);
        Multi<byte[]> res = Multi.createFrom().<byte[]>emitter(emitter -> {

            log.error("Starting emitting data");
            int sent = 0;
            do {
                byte[] data = getSomeSlowData();
                sent += data.length;
                emitter.emit(data);
                log.error("Sent another [{}] bytes totalling [{}]", data.length, sent);
            } while (sent < approxBytes);
            emitter.complete();
            log.error("Finished emitting");
        }).runSubscriptionOn(workerExecutor);

        log.error("Out getMulti");
        return res;
    }

    private byte[] getSomeSlowData() {
        Random r = new Random();
        byte[] someData = new byte[10240];
        r.nextBytes(someData);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            log.warn("Error:", e);
        }
        return someData;
    }

Alternately, you can also just mark the endpoint as blocking, and both the endpoint and the subscription will run on the worker thread pool:

    @Blocking

    @GET
    @Path("/multi-stream")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Multi<byte[]> getSomeBinary(@RestQuery int approxBytes) {

        log.error("In getMulti [{}]", approxBytes);
        Multi<byte[]> res = Multi.createFrom().<byte[]>emitter(emitter -> {

            log.error("Starting emitting data");
            int sent = 0;
            do {
                byte[] data = getSomeSlowData();
                sent += data.length;
                emitter.emit(data);
                log.error("Sent another [{}] bytes totalling [{}]", data.length, sent);
            } while (sent < approxBytes);
            emitter.complete();
            log.error("Finished emitting");
        });
--
Stéphane Épardaud
Reply all
Reply to author
Forward
0 new messages