vert.x http server stress test problem.

630 views
Skip to first unread message

Kidong Lee

unread,
Jan 21, 2019, 9:32:20 AM1/21/19
to vert.x
Hi,

I have written a http server with vert.x to process multipart form requests.

To perform stress test, I have deployed a http server on an aws linux instance, and other clients which is written as stress test client to post our multipart form requests to the server are installed on several aws linux instances.
In http server, dropwizard metrics(https://metrics.dropwizard.io/4.0.0/) can be calculated writes per second which will be printed to system out console.

I have run first client to send requests with body size of 1KB, and I got a result about 15,000 writes per second. At that time, the cpu usage on the http server instance was minimal, and memory also ok.
I have run other clients to send more requests, but the metrics measured performance which I got was the same as the result which I got with just one client requests, and on the http server, cpu, memory was just minimal.
I thought, as the number of stress test clients increases, I would get more number of writes per second.

Aws network speed is high, I have measured it with iperf, I got about 9.6Gbit/s.
Even if I have changed vert.x configuration like sendBufferSize, receiveBufferSize, and acceptBacklog,
and some tcp confs in linux have been changed on server and client instances, but it did not help.

Does anybody know how to tune vert.x http server or linux server to get better result as I expected for my stress test?

thanks,

- Kidong Lee.


Sam

unread,
Jan 21, 2019, 10:23:49 AM1/21/19
to vert.x
Can you please share you code , that will be helpful in understanding the issue.

Kidong Lee

unread,
Jan 21, 2019, 8:40:50 PM1/21/19
to vert.x
Sure,

The following listing is http server codes:

        ...
        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.setMaxEventLoopExecuteTime(Long.MAX_VALUE);
        vertxOptions.setEventLoopPoolSize(100);
        vertxOptions.setWorkerPoolSize(30);

        Vertx vertx = Vertx.vertx(vertxOptions);

        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setSendBufferSize(4096);
        httpServerOptions.setReceiveBufferSize(4096);
        httpServerOptions.setAcceptBacklog(10000);
        httpServerOptions.setReuseAddress(true);
        httpServerOptions.setIdleTimeout(300);

        Router router = Router.router(vertx);

        ...
        // MultipartBodyHandler is custom implementation of BodyHandler.
        router.post(ObjectScreenshotCreateRequestHandler.URI_PATH).handler(MultipartBodyHandler.create().setMergeFormAttributes(true));
        router.post(ObjectScreenshotCreateRequestHandler.URI_PATH).handler(this.objectScreenshotCreateRequestHandler);
        ...

        this.server = vertx.createHttpServer(httpServerOptions);
        server.requestHandler(router::accept).listen(port);

And the stress test client written with vert.x web looks like this:

       ...

       String gzipFilePath = "target/actions.gzip";

        // first, write gzip file to local.
        writeFile(gzipFilePath, actions);

        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.setMaxEventLoopExecuteTime(Long.MAX_VALUE);
        vertxOptions.setEventLoopPoolSize(100);
        vertxOptions.setWorkerPoolSize(30);
        vertxOptions.setBlockedThreadCheckInterval(200000000);

        Vertx vertx = Vertx.vertx(vertxOptions);

        WebClientOptions httpClientOptions = new WebClientOptions();
        httpClientOptions.setDefaultHost(host)
                .setDefaultPort(Integer.parseInt(port))
                .setMaxPoolSize(300);

        int MAX_THREAD = executorSize;

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREAD);

        for(int i = 0; i < MAX_THREAD; i++) {
            executor.execute(new HttpClientTask(vertx, host, port, uri, pauseInNanos, maxPoolSize, meta, gzipFilePath));
        }

        ...

       private static class HttpClientTask implements Runnable
        {
            AtomicLong count = new AtomicLong(0);

            private WebClient client;
            private String uri;
            private MultipartForm form;
            private long pauseInNanos;

            public HttpClientTask(Vertx vertx, String host, String port, String uri, long pauseInNanos, int maxPoolSize, String meta, String gzipFilePath)
            {
                this.uri = uri;
                this.pauseInNanos = pauseInNanos;

                WebClientOptions httpClientOptions = new WebClientOptions();
                httpClientOptions.setDefaultHost(host)
                        .setDefaultPort(Integer.parseInt(port))
                        .setMaxPoolSize(maxPoolSize)
                        .setReuseAddress(true);


                client = WebClient.create(vertx, httpClientOptions);

                form = MultipartForm.create()
                        .attribute("d", meta)
                        .binaryFileUpload("f", "actions",  gzipFilePath, "gzip");
            }


            @Override
            public void run() {
                log.info("[{}] ready to send events...", Thread.currentThread());

                while (true) {
                    // send request.
                    client.post(uri)
                            .putHeader("content-type", "multipart/form-data")
                            .sendMultipartForm(form, ar -> {
                                if (ar.succeeded()) {
                                    // Ok
                                }
                                else
                                {
                                    log.error("Something went wrong: " + ar.cause().getMessage());
                                }
                            });

                    if (count.incrementAndGet() % 10000 == 0) {
                        log.info("request count: [{}]", count.get());
                    }

                    // take a time for a while...
                    TimeUtils.pause(pauseInNanos);
                }
            }
        }

In addition to set vert.x confs, linux tcp settings are like this:

sudo sysctl -w net.ipv4.tcp_tw_reuse=1
sudo sysctl -w net.core.somaxconn=10000
sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000

- KIdong Lee





2019년 1월 22일 화요일 오전 12시 23분 49초 UTC+9, Sam 님의 말:

Santosh Yadav

unread,
Jan 21, 2019, 9:30:09 PM1/21/19
to ve...@googlegroups.com
My observations

1- don't dump metrics on stdout while stress testing (you might want to do it periodically)

2- in my experience one verticle cant scale beyond one core , you need to deploy multiple verticle instances.

3- I hope there aren't any blocking call in request handler 

4- try to tune further TCP parameters also , rmem wmem etc

5- while executing server executable. I.e. Java -jar . Give enough memory -xms -xmx , tune GC parameters. 

6- I haven't analyzed client code , first try to analyze the results with h2load or ab load testing tool.



--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/14ee98dc-8fc6-4a0c-977f-18f4ba600d4b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Julien Viet

unread,
Jan 22, 2019, 3:56:46 AM1/22/19
to ve...@googlegroups.com
Good recommandations :-)

I think you want to observe whether or not the CPU core executing the single verticle is saturated or not.

If that's saturated then it means indeed you need more CPU and should increase the number of verticle instances so Vert.x will load balance them on the CPU cores.

Julien

Santosh

unread,
Jan 22, 2019, 4:08:53 AM1/22/19
to vert.x
ThankYou!! :-)

Yes Indeed, we need to observe if whether or not core is saturated. in my POC application i added more verticle once core was saturated. scaling ability is awesome. :-) 

Right now i am evaluating https://github.com/julien3/vertxbuspp ( c++ event bus client API ) , so that i can bind a C++ application to vertx http2 instance.

 will update once done.

Kidong Lee

unread,
Jan 22, 2019, 5:02:58 AM1/22/19
to ve...@googlegroups.com
Hello Santosh,

Thank you for your advice.

As you told me, I have written verticles and some linux tcp settings are changed like this:
sudo sysctl -w net.ipv4.tcp_rmem="1024   4096   16384"
sudo sysctl -w net.ipv4.tcp_wmem="1024   4096   16384"
sudo sysctl -w net.ipv4.tcp_moderate_rcvbuf="0"

And JVM heap size is set to 4GB as maximum.

Well, but I have not got the result as I expected,  the result is not so good as with just one http server which I first implemented.

The verticles which I wrote look like this:


DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setInstances(verticleCount);

VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(Long.MAX_VALUE);
vertxOptions.setEventLoopPoolSize(threadSize);
vertxOptions.setWorkerPoolSize(workerPoolSize);

vertx = Vertx.vertx(vertxOptions);

// run http server verticles.
vertx.deployVerticle("io.userhabit.anejo.component.HttpServerVerticle", deploymentOptions, ar -> {
if(ar.succeeded())
{
log.info("http server verticles deployed completely...");
}
else
{
log.error("something wrong: " + ar.cause().toString());
}
});
and HttpServerVerticle is the same like the original HttpServer.

- Kidong Lee.


2019년 1월 22일 (화) 오전 11:30, Santosh Yadav <sann.1...@gmail.com>님이 작성:

Hasani

unread,
Jan 22, 2019, 9:44:16 AM1/22/19
to vert.x
Hi Kidong,

What is your handler like, if it is like BodyHandler, that is writing to temp file, then most probably it is file IO bound. Vertx (netty) net stack is as fast as it could i doubt its the networking. Try not to persist to storage and test again and re isolate. My four cores linux box can reach ~200 thousands/sec of 1k file upload without immediate file IO with 8 (cores * 2) http server verticles.

Hope this helps

Cheers

Kidong Lee

unread,
Jan 22, 2019, 6:37:10 PM1/22/19
to ve...@googlegroups.com
Hello hasani,

Thanks for sharing your experience with me.

My BodyHandler is mentioned in my last post: 

And I did stress test written in vert.x web, which tool as http client has been used to get your result?

- Kidong

2019년 1월 22일 (화) 오후 11:44, Hasani <arief....@gmail.com>님이 작성:
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Kidong Lee

unread,
Jan 22, 2019, 9:50:42 PM1/22/19
to ve...@googlegroups.com
Hi,

After replacing my http serve with verticles, I got exceptions:
java.lang.IllegalStateException: Response has already been written
INFO   | jvm 1    | 2019/01/23 01:32:22 |       at io.vertx.core.http.impl.HttpServerResponseImpl.checkValid(HttpServerResponseImpl.java:561)
INFO   | jvm 1    | 2019/01/23 01:32:22 |       at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:158)
INFO   | jvm 1    | 2019/01/23 01:32:22 |       at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:55)

 I have never got such exceptions with my single http server.

Does anybody know about this exception?

Cheers,

- Kidong.


2019년 1월 23일 (수) 오전 8:36, Kidong Lee <myki...@gmail.com>님이 작성:

Hasani

unread,
Jan 22, 2019, 10:35:28 PM1/22/19
to vert.x

I was using Apache JMeter 

Cheers

Hasani

unread,
Jan 22, 2019, 11:21:52 PM1/22/19
to vert.x
Hi Kidong,

This is classic condition. Do not end the response on your MultipartBodyHandler and you have to call context.next  in there too since you have another handler on the same method and path.You might want to pause and resume the request too.

Cheers

Kidong Lee

unread,
Jan 23, 2019, 2:12:56 AM1/23/19
to vert.x
Hi Hasani,

My MultipartBodyHandler is already used in the original http server without verticles, where I never got such exception of "java.lang.IllegalStateException: Response has already been written".
I have intended to avoid saving uploaded file to the upload directory, thus I have written my custom MultipartBodyHandler.
It works fine if my http server runs without verticles, even though context.next() is not called in my MultipartBodyHandler.

This is my http verticle implementation:

         Router router = Router.router(vertx);
     
        router.post(ScreenshotCreateRequestHandler.URI_PATH).handler(MultipartBodyHandler.create().setMergeFormAttributes(true));
        router.post(ScreenshotCreateRequestHandler.URI_PATH).handler(routingContext -> {

            ...
           
            request = routingContext.request();
            response = request.response();
         
            if(contentType.contains("multipart/form-data")) {

                 Set<FileUpload> fileUploadSet = routingContext.fileUploads();
                for(FileUpload fileUpload : fileUploadSet) {
                    String fieldName = fileUpload.name();
                    if(log.isDebugEnabled()) log.debug( "fieldName: [{}]", fieldName);

                    if(fieldName.equals("f")) {
                        // upload file data.
                        byte[] uploadData = ((MultipartFileUploadImpl)fileUpload).getData();

                        data = uploadData;
                        if(log.isDebugEnabled()) log.debug( "uploadedFileData size: [{}]", data.length);

                        if(data == null || data.length == 0)
                        {
                            log.warn("uploaded file data size is 0 !!!!!!!");
                        }


                        dataContentType = fileUpload.contentType();
                        if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);

                        break;
                    }
                }           
            }       
            else if(contentType.contains("application/json"))
            {
                  ...
            }
            // 그외의 content type.
            else
            {
                // DO NOT HANDLE!!
                log.error("unsupported content type [{}]!!", contentType);

                return;
            }       

            ResponseData responseData = ...;


            // respond to the client.
            // if response connection ended or closed, do nothing.
            if(response.ended() || response.closed())
            {

                return;
            }

            // send response to the client.
            response.setStatusCode(responseData.getStatusCode());
            response.setStatusMessage(responseData.getStatusMessage());

            response.putHeader("Access-Control-Allow-Origin", "*");
            response.putHeader("Content-Type", responseData.getContentType());
            response.putHeader("Content-Length", "" + String.valueOf(responseData.getResponseMessage().getBytes().length));
            response.end(responseData.getResponseMessage());
       
        });

        HttpServer server = vertx.createHttpServer(httpServerOptions);
        server.requestHandler(router::accept).listen(port);


And the full classes of my MultipartBodyHandler implementation are:

package xxx.multipart;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.FileSystem;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.FileUpload;
import io.vertx.ext.web.RoutingContext;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


public class MultipartBodyHandlerImpl implements MultipartBodyHandler {

private static final Logger log = LoggerFactory.getLogger(MultipartBodyHandlerImpl.class);

private static final String BODY_HANDLED = "__body-handled";

private long bodyLimit = DEFAULT_BODY_LIMIT;
private boolean handleFileUploads;
private String uploadsDir;
private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES;
private boolean deleteUploadedFilesOnEnd = DEFAULT_DELETE_UPLOADED_FILES_ON_END;
private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER;
private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes


public MultipartBodyHandlerImpl() {
this(true, DEFAULT_UPLOADS_DIRECTORY);
}

public MultipartBodyHandlerImpl(boolean handleFileUploads) {
this(handleFileUploads, DEFAULT_UPLOADS_DIRECTORY);
}

public MultipartBodyHandlerImpl(String uploadDirectory) {
this(true, uploadDirectory);
}

private MultipartBodyHandlerImpl(boolean handleFileUploads, String uploadDirectory) {
this.handleFileUploads = handleFileUploads;
setUploadsDirectory(uploadDirectory);
}

@Override
public void handle(RoutingContext context) {
HttpServerRequest request = context.request();
if (request.headers().contains(HttpHeaders.UPGRADE, HttpHeaders.WEBSOCKET, true)) {
context.next();
return;
}
// we need to keep state since we can be called again on reroute
Boolean handled = context.get(BODY_HANDLED);
if (handled == null || !handled) {
long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1;
BHandler handler = new BHandler(context, contentLength);
request.handler(handler);
request.endHandler(v -> handler.end());
context.put(BODY_HANDLED, true);
} else {
// on reroute we need to re-merge the form params if that was desired
if (mergeFormAttributes && request.isExpectMultipart()) {
request.params().addAll(request.formAttributes());
}

context.next();
}
}

@Override
public MultipartBodyHandler setHandleFileUploads(boolean handleFileUploads) {
this.handleFileUploads = handleFileUploads;
return this;
}

@Override
public MultipartBodyHandler setBodyLimit(long bodyLimit) {
this.bodyLimit = bodyLimit;
return this;
}

@Override
public MultipartBodyHandler setUploadsDirectory(String uploadsDirectory) {
this.uploadsDir = uploadsDirectory;
return this;
}

@Override
public MultipartBodyHandler setMergeFormAttributes(boolean mergeFormAttributes) {
this.mergeFormAttributes = mergeFormAttributes;
return this;
}

@Override
public MultipartBodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) {
this.deleteUploadedFilesOnEnd = deleteUploadedFilesOnEnd;
return this;
}

@Override
public MultipartBodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) {
this.isPreallocateBodyBuffer = isPreallocateBodyBuffer;
return this;
}

private long parseContentLengthHeader(HttpServerRequest request) {
String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH);
if(contentLength == null || contentLength.isEmpty()) {
return -1;
}
try {
long parsedContentLength = Long.parseLong(contentLength);
return parsedContentLength < 0 ? null : parsedContentLength;
}
catch (NumberFormatException ex) {
return -1;
}
}

private class BHandler implements Handler<Buffer> {
private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535;

RoutingContext context;
Buffer body;
boolean failed;
AtomicInteger uploadCount = new AtomicInteger();
AtomicBoolean cleanup = new AtomicBoolean(false);
boolean ended;
long uploadSize = 0L;
final boolean isMultipart;
final boolean isUrlEncoded;

public BHandler(RoutingContext context, long contentLength) {
this.context = context;
Set<FileUpload> fileUploads = context.fileUploads();

final String contentType = context.request().getHeader(HttpHeaders.CONTENT_TYPE);
if (contentType == null) {
isMultipart = false;
isUrlEncoded = false;
} else {
final String lowerCaseContentType = contentType.toLowerCase();
isMultipart = lowerCaseContentType.startsWith(HttpHeaderValues.MULTIPART_FORM_DATA.toString());
isUrlEncoded = lowerCaseContentType.startsWith(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
}

initBodyBuffer(contentLength);

if (isMultipart || isUrlEncoded) {
context.request().setExpectMultipart(true);

context.request().uploadHandler(upload -> {
if (bodyLimit != -1 && upload.isSizeAvailable()) {
// we can try to abort even before the upload starts
long size = uploadSize + upload.size();
if (size > bodyLimit) {
failed = true;
context.fail(413);
return;
}
}
if (handleFileUploads) {
// we actually upload to a file with a generated filename
uploadCount.incrementAndGet();

// NOTE: upload buffer stream will not be save to upload dir.
// buffer data retrieved from upload stream will be forwarded to the next in routing context.
Pump p = Pump.pump(upload, new UploadWriteStream(fileUploads, upload));
p.start();

upload.resume();
upload.exceptionHandler(t -> {
deleteFileUploads();
context.fail(t);
});
upload.endHandler(v -> uploadEnded());
}
});
}

context.request().exceptionHandler(t -> {
deleteFileUploads();
context.fail(t);
});
}

private class UploadWriteStream implements WriteStream<Buffer>
{
private Set<FileUpload> fileUploads;
private HttpServerFileUpload upload;

public UploadWriteStream(Set<FileUpload> fileUploads, HttpServerFileUpload upload)
{
this.fileUploads = fileUploads;
this.upload = upload;
}

public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}

public WriteStream<Buffer> write(Buffer buffer) {
byte[] data = buffer.getBytes();

// HERE!!! byte array of upload file data will be added to routing context.
MultipartFileUploadImpl fileUpload = new MultipartFileUploadImpl(null, upload, data);
fileUploads.add(fileUpload);

return this;
}

public void end() {
}

public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
return this;
}

public boolean writeQueueFull() {
return false;
}

public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
return this;
}
}



private void initBodyBuffer(long contentLength) {
int initialBodyBufferSize;
if(contentLength < 0) {
initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE;
}
else if(contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) {
initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES;
}
else {
initialBodyBufferSize = (int) contentLength;
}

if(bodyLimit != -1) {
initialBodyBufferSize = (int)Math.min(initialBodyBufferSize, bodyLimit);
}

this.body = Buffer.buffer(initialBodyBufferSize);
}

@Override
public void handle(Buffer buff) {
if (failed) {
return;
}
uploadSize += buff.length();
if (bodyLimit != -1 && uploadSize > bodyLimit) {
failed = true;
context.fail(413);
// enqueue a delete for the error uploads
context.vertx().runOnContext(v -> deleteFileUploads());
} else {
// multipart requests will not end up in the request body
// url encoded should also not, however jQuery by default
// post in urlencoded even if the payload is something else
if (!isMultipart /* && !isUrlEncoded */) {
body.appendBuffer(buff);
}
}
}

void uploadEnded() {
int count = uploadCount.decrementAndGet();
// only if parsing is done and count is 0 then all files have been processed
if (ended && count == 0) {
doEnd();
}
}

void end() {
// this marks the end of body parsing, calling doEnd should
// only be possible from this moment onwards
ended = true;

// only if parsing is done and count is 0 then all files have been processed
if (uploadCount.get() == 0) {
doEnd();
}
}

void doEnd() {

if (failed) {
deleteFileUploads();
return;
}

if (deleteUploadedFilesOnEnd) {
context.addBodyEndHandler(x -> deleteFileUploads());
}

HttpServerRequest req = context.request();
if (mergeFormAttributes && req.isExpectMultipart()) {
req.params().addAll(req.formAttributes());
}
context.setBody(body);
context.next();
}

private void deleteFileUploads() {
if (cleanup.compareAndSet(false, true) && handleFileUploads) {
for (FileUpload fileUpload : context.fileUploads()) {
FileSystem fileSystem = context.vertx().fileSystem();
String uploadedFileName = fileUpload.uploadedFileName();
fileSystem.exists(uploadedFileName, existResult -> {
if (existResult.failed()) {
log.warn("Could not detect if uploaded file exists, not deleting: " + uploadedFileName, existResult.cause());
} else if (existResult.result()) {
fileSystem.delete(uploadedFileName, deleteResult -> {
if (deleteResult.failed()) {
log.warn("Delete of uploaded file failed: " + uploadedFileName, deleteResult.cause());
}
});
}
});
}
}
}
}

}

...


package xxx.multipart;


import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.ext.web.FileUpload;

public class MultipartFileUploadImpl implements FileUpload {
private final String uploadedFileName;
private final HttpServerFileUpload upload;
private final byte[] data;

public MultipartFileUploadImpl(String uploadedFileName, HttpServerFileUpload upload, byte[] data) {
this.uploadedFileName = uploadedFileName;
this.upload = upload;
this.data = data;
}

public byte[] getData() {
return data;
}

public String name() {
return this.upload.name();
}

public String uploadedFileName() {
return this.uploadedFileName;
}

public String fileName() {
return this.upload.filename();
}

public long size() {
return this.upload.size();
}

public String contentType() {
return this.upload.contentType();
}

public String contentTransferEncoding() {
return this.upload.contentTransferEncoding();
}

public String charSet() {
return this.upload.charset();
}
}


...


package xxx.multipart;

import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;


public interface MultipartBodyHandler extends Handler<RoutingContext> {

long DEFAULT_BODY_LIMIT = -1;

String DEFAULT_UPLOADS_DIRECTORY = "file-uploads";

boolean DEFAULT_MERGE_FORM_ATTRIBUTES = true;

boolean DEFAULT_DELETE_UPLOADED_FILES_ON_END = false;

boolean DEFAULT_PREALLOCATE_BODY_BUFFER = false;

static MultipartBodyHandler create() {
return new MultipartBodyHandlerImpl();
}

static MultipartBodyHandler create(boolean handleFileUploads) {
return new MultipartBodyHandlerImpl(handleFileUploads);
}

static MultipartBodyHandler create(String uploadDirectory) {
return new MultipartBodyHandlerImpl(uploadDirectory);
}

MultipartBodyHandler setHandleFileUploads(boolean handleFileUploads);

MultipartBodyHandler setBodyLimit(long bodyLimit);

MultipartBodyHandler setUploadsDirectory(String uploadsDirectory);

MultipartBodyHandler setMergeFormAttributes(boolean mergeFormAttributes);

MultipartBodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd);

MultipartBodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer);
}



Cheers,

- Kidong.



2019년 1월 23일 수요일 오후 1시 21분 52초 UTC+9, Hasani 님의 말:

Kidong Lee

unread,
Jan 23, 2019, 4:47:06 AM1/23/19
to vert.x
Hi Hasani,

I have also tried replaced my MultipartBodyHandler with vert,x BodyHadler in my current http server verticles.
But my verticles has thrown sample exceptions of "java.lang.IllegalStateException: Response has already been written".

Could you share your verticles codes to handle file upload with me?

Cheers,

- Kidong,


2019년 1월 23일 수요일 오후 4시 12분 56초 UTC+9, Kidong Lee 님의 말:

Kidong Lee

unread,
Jan 23, 2019, 5:25:14 AM1/23/19
to ve...@googlegroups.com
Thanks hasani.

- Kidong

2019년 1월 23일 (수) 오후 12:35, Hasani <arief....@gmail.com>님이 작성:

I was using Apache JMeter 

Cheers

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Kidong Lee

unread,
Jan 23, 2019, 6:00:33 AM1/23/19
to vert.x
Hi, hasani.

I want to correct my previous post comments.
Context.next() is called in my MultipartBodyHandler. Sorry for that.

Cheers,

- Kidong



2019년 1월 23일 (수) 오후 4:13, Kidong Lee <myki...@gmail.com>님이 작성:

Hasani

unread,
Jan 23, 2019, 10:40:42 AM1/23/19
to vert.x
Hi Kidong, 

Unfortunately i can not reproduce your error, i can run your code (with io.vertx.ext.web.handler.BodyHandler) without error.
I am not using BodyHandler but still use router. Code is as follows

router.post().handler(ctx -> {
      HttpServerRequest request = ctx.request();
      request.setExpectMultipart(true);
      request.pause();
      ctx.next();
//    }).blockingHandler(HandlerPost::handleStaticBlocking);
    }).blockingHandler((ctx) -> {
      HttpServerRequest request = ctx.request();
      HttpServerResponse response = ctx.response();

      request.uploadHandler(upload -> {
//        Buffer buff = Buffer.buffer();
        upload.handler(chunk -> {
//          buff.appendBuffer(chunk);
        });
        upload.endHandler((none) -> {
         // everything is in buff
        });
      });

      request.endHandler(v -> {
        MultiMap formAttributes = request.formAttributes();
        formAttributes.iterator().forEachRemaining(rec -> {
          // get form attributes
        });
        if (!response.closed()) {
          response.end();
        }
      });
      
      request.resume();
      
    });

Cheers, hope this helps

Kidong Lee

unread,
Jan 23, 2019, 9:58:49 PM1/23/19
to vert.x
Thanks, Hasani.

I have implemented a verticle based on your codes.
My question is how to get byte array from upload, I tried like the following but the size of it is 0:

Buffer buff = Buffer.buffer();

upload.handler(chunk -> {
    buff.appendBuffer(chunk);
});

upload.endHandler((none) -> {
// everything in the buffer.
});


data = buff.getBytes();

Do you have any idea how to retrieve file upload byte array directly from upload?

Cheers,

- Kidong.





2019년 1월 24일 목요일 오전 12시 40분 42초 UTC+9, Hasani 님의 말:

Hasani

unread,
Jan 23, 2019, 10:45:17 PM1/23/19
to vert.x
Hi Kidong.

Glad that it helps. It should be much simpler now.

As mentioned in https://vertx.io/docs/apidocs/io/vertx/core/buffer/Buffer.html#buffer--
Buffer has getBytes method.

Cheers and welcome to Vertx

Kidong Lee

unread,
Jan 23, 2019, 11:05:32 PM1/23/19
to vert.x
As I listed in the codes, even though I called buff.getBytes(), the size of it is 0!

It seems, with Pump, it can be retrieved from upload, but Pump works asynchronously.
Well, I will try again.

cheers,

- Kidong.





2019년 1월 24일 목요일 오후 12시 45분 17초 UTC+9, Hasani 님의 말:

Hasani

unread,
Jan 23, 2019, 11:10:39 PM1/23/19
to vert.x
Hi Kidong, my mistake, sorry. I miss the code, The getBytes should be inside the endHandler and you are all set.

upload.endHandler((none) -> {
data = buff.getBytes();
});



Cheers mate

Hasani

unread,
Jan 23, 2019, 11:55:24 PM1/23/19
to vert.x

Hi Kidong, 

Have you get it to run?. Can you share your benchmark number with this (without saving to file)? thanks.

Cheers

Kidong Lee

unread,
Jan 24, 2019, 3:17:02 AM1/24/19
to vert.x
Hi Hasani,

Unfortunately, I have a problem, let's look at my codes:

request.uploadHandler(upload -> {

Buffer buff = Buffer.buffer();

upload.handler(chunk -> {
buff.appendBuffer(chunk);
});

upload.endHandler((none) -> {
// everything in the buffer.

        data = buff.getBytes();

if(log.isDebugEnabled()) log.debug( "uploadedFileData size: [{}]", data.length);

if(data == null || data.length == 0)
{
log.warn("uploaded file data size is 0 !!!!!!!");
}

        dataContentType = upload.contentType();
        if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);                    
    });
});


request.endHandler(v -> {
MultiMap formAttributes = request.formAttributes();

    requestContentString = formAttributes.get("d");
if(log.isDebugEnabled()) log.debug("requestContentString: [{}]", requestContentString);

// HERE!! data is null, because
upload.endHandler() called asynchronously.
// if data length is 0.
if(data.length == 0)
{
...
}


// event log disruptor translator.
...
});



From request.uploadHandler(), I can get byte array of file upload file data,
and from request.endHandler(), I can get another form data from formAttributes().
The problem is that in request.endHandler(), I cannot get any byte array of file upload file, because it is handled in upload handler in async manner.

After retrieving these values, I have to set them to domain object which will be sent to the next pipeline, that is, disruptor queue.

I have no idea how to get both of data, namely, byte array of file upload and the value of form attributes in a handler.

Cheers,

- Kidong.






2019년 1월 24일 목요일 오후 1시 55분 24초 UTC+9, Hasani 님의 말:

Hasani

unread,
Jan 24, 2019, 3:39:25 AM1/24/19
to vert.x
Hi Kidong, 

Just declare your buffer outside the handler, its just scoping problem. make sure that you access the buffer inside the request endhandler as that would be the last thing called.

Cheers

Kidong Lee

unread,
Jan 24, 2019, 3:56:44 AM1/24/19
to vert.x
Hello Hasnai,

It comes to the same result of 0 bytes, even though I have declared buffer outside of handler:

Buffer buff = Buffer.buffer();

request.uploadHandler(upload -> {


upload.handler(chunk -> {
buff.appendBuffer(chunk);
});

upload.endHandler((none) -> {
// everything in the buffer.

        dataContentType = upload.contentType();
if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);
});
});


request.endHandler(v -> {
    data = buff.getBytes();
// data length is 0 !!!
}

- Kidong.


2019년 1월 24일 목요일 오후 5시 39분 25초 UTC+9, Hasani 님의 말:

Kidong Lee

unread,
Jan 25, 2019, 10:38:09 PM1/25/19
to vert.x
Hi, Hasani,

Finally, I got some results as I expected using verticles.

I have used my MultipartBodyHandler as in my original http server.
But I have used routing context to put some data, let's say, RES, which comes to the result from being invoked in blocking execution.
In the next pipeline handler, RES will be retrieved from routing context, and it will be used to send response to the client.

Let's summarize my case.
1. Using MultipartBodyHandler, it works fine to handle file upload, but it does not scale, it does not use all the cpu cores. Load test result is about 15,000 writes per second.
2. I have changed my single http server to the verticles. But I got such exceptions: java.lang.IllegalStateException: Response has already been written.
3. As I mentioned above,  I have used blocking execution, routing context put, and added another pipeline handler to respond to the clients. I have achieved the result of about 50,000 TPS with utilizing all the cpu cores.


The followng listing is my verticle which I changed:


        router.post(SessionScreenshotCreateRequestHandler.URI_PATH).handler(MultipartBodyHandler.create().setMergeFormAttributes(true));
        router.post(SessionScreenshotCreateRequestHandler.URI_PATH).handler(routingContext -> {


            request = routingContext.request();
            response = request.response();
            ...

           
            Set<FileUpload> fileUploadSet = routingContext.fileUploads();
            for(FileUpload fileUpload : fileUploadSet) {
                String fieldName = fileUpload.name();
                if(log.isDebugEnabled()) log.debug( "fieldName: [{}]", fieldName);

                if(fieldName.equals("f")) {
                    // upload file data.
                    byte[] uploadData = ((MultipartFileUploadImpl)fileUpload).getData();

                    data = uploadData;
                    if(log.isDebugEnabled()) log.debug( "uploadedFileData size: [{}]", data.length);

                    if(data == null || data.length == 0)
                    {
                        log.warn("uploaded file data size is 0 !!!!!!!");
                    }


                    dataContentType = fileUpload.contentType();

                    if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);

                    break;

                }
            }

            MultiMap formAttributes = request.formAttributes();
            requestContentString = formAttributes.get("d");
           
            ...

            routingContext.vertx().<ResponseData>executeBlocking(future -> {
               
                // HERE, some blocking function will be invoked...
                future.complete(doHandle());
               
            }, false, res -> {
                if(res.succeeded())
                {
                    ResponseData responseData = res.result();
               
                    if(responseData == null)
                    {
                        if(!response.ended())
                        {
                            response.end();
                        }

                        return;
                    }

                    // response data set to the context in order to respond to the client in the next pipeline.
                    routingContext.put(CONTEXT_RESPONSE_DATA, responseData);
                    routingContext.put(CONTEXT_CLOSE_RESPONSE, closeResponse);
                    routingContext.next();
                }
                else if(res.failed())
                {
                    log.error("error: [{}]", res.cause().getMessage());
                    if(!response.ended())
                    {
                        response.end();
                    }
                }
            });
           
        }).handler(routingContext -> {
            HttpServerResponse response = routingContext.response();

            ResponseData responseData = routingContext.get(CONTEXT_RESPONSE_DATA);
            Boolean closeResponse = routingContext.get(CONTEXT_CLOSE_RESPONSE);

           
            // if response connection ended or closed, do nothing.
            if(response.ended() || response.closed())
            {
                return;
            }

            // send response to the client.
            response.setStatusCode(responseData.getStatusCode());
            response.setStatusMessage(responseData.getStatusMessage());

            response.putHeader("Access-Control-Allow-Origin", "*");
            response.putHeader("Content-Type", responseData.getContentType());
            response.putHeader("Content-Length", "" + String.valueOf(responseData.getResponseMessage().getBytes().length));
            response.end(responseData.getResponseMessage());

            if (closeResponse) {
                response.close();
            }
        });



The point of solving my problem of the exception java.lang.IllegalStateException: Response has already been written is,
- use Blocking execution in which for instance, external api has to be invoked.
- use Routing Context.next() in the blocking execution async result handler.
- in the next pipeline handler, handle response there.

Thanks hasani for your suggestions and your codes which were good tips to understand verticles, and helped solving my problem.

Cheers,

- Kidong.








2019년 1월 24일 (목) 오후 5:56, Kidong Lee <myki...@gmail.com>님이 작성:
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Hasani

unread,
Jan 26, 2019, 8:12:30 AM1/26/19
to vert.x

Hi Kidong, Glad that it helps. Vertx has peculiarities on async and callbacks took me a while too. Always welcome to have people embracing Vertx. Look forward for your benchmark on multi verticles.

Cheers

Kidong Lee

unread,
Jan 26, 2019, 8:49:16 PM1/26/19
to vert.x
Hi Hasani,

I am going to do benchmark test in the next week, after that, I will share the results with you.

In addition to that, I have written some blogs about how to solve my problems in verticle:

- HowTo: Resolve exception of ‘Response has already been written’ in Vert.x Verticle: https://medium.com/@mykidong/howto-resolve-exception-of-response-has-already-been-written-in-vert-x-verticle-d43d3c06b6e
- HowTo: Reference other spring beans in Vert.x Verticle running in Spring: https://medium.com/@mykidong/howto-reference-other-spring-beans-in-vert-x-verticle-running-in-spring-8bc461578074
- HowTo: Avoid saving multipart form upload files into the upload directory in Vert.x: https://medium.com/@mykidong/howto-avoid-saving-multipart-form-upload-files-into-the-upload-directory-in-vert-x-edde1ef58824

, which could help other people like me.

Cheers,

- Kidong Lee.



2019년 1월 26일 (토) 오후 10:12, Hasani <arief....@gmail.com>님이 작성:

Hi Kidong, Glad that it helps. Vertx has peculiarities on async and callbacks took me a while too. Always welcome to have people embracing Vertx. Look forward for your benchmark on multi verticles.

Cheers

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Kidong Lee

unread,
Jan 28, 2019, 2:57:11 AM1/28/19
to vert.x
Hi Hasani,

The following is my benchmark result:

Condition:
Collector Node
AWS EC2 Instance m5.2xlarge(8 vCPU, 32 GB Memory)
Network Performance9.6 Gbit/s
Collector Configurationhttp.threadSize=16
http.workerPoolSize=100
http.idleTimeoutInSeconds=300
http.sendBufferSize=4096
http.receiveBufferSize=4096
http.acceptBacklog=10000
http.verticleCount=6
Collector JVM Configuration

wrapper.java.additional.1=-Dorg.tanukisoftware.wrapper.WrapperStartStopApp.maxStartMainWait=300
wrapper.java.additional.2=-server
wrapper.java.additional.3=-XX:+UseNUMA
wrapper.java.additional.4=-XX:+UseParallelGC
wrapper.java.additional.5=-XX:+AggressiveOpts
wrapper.java.additional.6=-XX:+UseFastAccessorMethods
wrapper.java.additional.7=-Dcom.sun.management.jmxremote.port=29999
wrapper.java.additional.8=-Dcom.sun.management.jmxremote.ssl=false
wrapper.java.additional.9=-Dcom.sun.management.jmxremote.authenticate=false
wrapper.java.additional.10=-Dvertx.disableWebsockets=true
wrapper.java.additional.11=-Dvertx.flashPolicyHandler=false
wrapper.java.additional.12=-Dvertx.threadChecks=false
wrapper.java.additional.13=-Dvertx.disableContextTimings=true


# Initial Java Heap Size (in MB)
wrapper.java.initmemory=4096

# Maximum Java Heap Size (in MB)
wrapper.java.maxmemory=4096

Load Test ClientAsyncHttpClient


Result Metrics:

Data Size
Case 1. Kafka Producing: false, Logging: false (TPS)
Case 2. Kafka Producing: true, Logging: true (TPS)
1KB5254438717
2KB4796028140
5KB3902822143
10KB3194919078
20KB2346315050
50KB1358111053
100KB79476977




I have achieved about 50k writes per second which we accept for our collector of file upload
which our mobile apps send a lot of requests with.

I think, if we would have more cpu cores machine for our http server verticle, we could get better result.

Cheers,

- Kidong.




2019년 1월 27일 (일) 오전 10:48, Kidong Lee <myki...@gmail.com>님이 작성:

Hasani

unread,
Jan 30, 2019, 12:26:27 AM1/30/19
to vert.x
Hi Kidong, 

Great stuff and that is a performance, that number is already at the verge of AWS SSD IOPS. Thanks mate.

Cheers
- Hasani

Kidong Lee

unread,
Mar 1, 2019, 8:18:19 AM3/1/19
to vert.x
Hi all,

I have corrected my custom body handler implementation, MultipartBodyHandlerImpl as in the following link:

cheers,

- Kidong Lee.



2019년 1월 23일 수요일 오후 4시 12분 56초 UTC+9, Kidong Lee 님의 말:
Hi Hasani,
Reply all
Reply to author
Forward
0 new messages