sudo sysctl -w net.ipv4.tcp_tw_reuse=1sudo sysctl -w net.core.somaxconn=10000sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000
- KIdong Lee
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/14ee98dc-8fc6-4a0c-977f-18f4ba600d4b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/CAAKpjUU3wSPzhMMJB483TQEQGN3ncMYQhz7qypCAq0kyG3NDFw%40mail.gmail.com.
DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setInstances(verticleCount);
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(Long.MAX_VALUE);
vertxOptions.setEventLoopPoolSize(threadSize);
vertxOptions.setWorkerPoolSize(workerPoolSize);
vertx = Vertx.vertx(vertxOptions);
// run http server verticles.
vertx.deployVerticle("io.userhabit.anejo.component.HttpServerVerticle", deploymentOptions, ar -> {
if(ar.succeeded())
{
log.info("http server verticles deployed completely...");
}
else
{
log.error("something wrong: " + ar.cause().toString());
}
});
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/CAAKpjUU3wSPzhMMJB483TQEQGN3ncMYQhz7qypCAq0kyG3NDFw%40mail.gmail.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/e6c85e19-407d-4f1f-8d8d-417810819216%40googlegroups.com.
package xxx.multipart;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.FileSystem;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.FileUpload;
import io.vertx.ext.web.RoutingContext;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MultipartBodyHandlerImpl implements MultipartBodyHandler {
private static final Logger log = LoggerFactory.getLogger(MultipartBodyHandlerImpl.class);
private static final String BODY_HANDLED = "__body-handled";
private long bodyLimit = DEFAULT_BODY_LIMIT;
private boolean handleFileUploads;
private String uploadsDir;
private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES;
private boolean deleteUploadedFilesOnEnd = DEFAULT_DELETE_UPLOADED_FILES_ON_END;
private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER;
private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes
public MultipartBodyHandlerImpl() {
this(true, DEFAULT_UPLOADS_DIRECTORY);
}
public MultipartBodyHandlerImpl(boolean handleFileUploads) {
this(handleFileUploads, DEFAULT_UPLOADS_DIRECTORY);
}
public MultipartBodyHandlerImpl(String uploadDirectory) {
this(true, uploadDirectory);
}
private MultipartBodyHandlerImpl(boolean handleFileUploads, String uploadDirectory) {
this.handleFileUploads = handleFileUploads;
setUploadsDirectory(uploadDirectory);
}
@Override
public void handle(RoutingContext context) {
HttpServerRequest request = context.request();
if (request.headers().contains(HttpHeaders.UPGRADE, HttpHeaders.WEBSOCKET, true)) {
context.next();
return;
}
// we need to keep state since we can be called again on reroute
Boolean handled = context.get(BODY_HANDLED);
if (handled == null || !handled) {
long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1;
BHandler handler = new BHandler(context, contentLength);
request.handler(handler);
request.endHandler(v -> handler.end());
context.put(BODY_HANDLED, true);
} else {
// on reroute we need to re-merge the form params if that was desired
if (mergeFormAttributes && request.isExpectMultipart()) {
request.params().addAll(request.formAttributes());
}
context.next();
}
}
@Override
public MultipartBodyHandler setHandleFileUploads(boolean handleFileUploads) {
this.handleFileUploads = handleFileUploads;
return this;
}
@Override
public MultipartBodyHandler setBodyLimit(long bodyLimit) {
this.bodyLimit = bodyLimit;
return this;
}
@Override
public MultipartBodyHandler setUploadsDirectory(String uploadsDirectory) {
this.uploadsDir = uploadsDirectory;
return this;
}
@Override
public MultipartBodyHandler setMergeFormAttributes(boolean mergeFormAttributes) {
this.mergeFormAttributes = mergeFormAttributes;
return this;
}
@Override
public MultipartBodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) {
this.deleteUploadedFilesOnEnd = deleteUploadedFilesOnEnd;
return this;
}
@Override
public MultipartBodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) {
this.isPreallocateBodyBuffer = isPreallocateBodyBuffer;
return this;
}
private long parseContentLengthHeader(HttpServerRequest request) {
String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH);
if(contentLength == null || contentLength.isEmpty()) {
return -1;
}
try {
long parsedContentLength = Long.parseLong(contentLength);
return parsedContentLength < 0 ? null : parsedContentLength;
}
catch (NumberFormatException ex) {
return -1;
}
}
private class BHandler implements Handler<Buffer> {
private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535;
RoutingContext context;
Buffer body;
boolean failed;
AtomicInteger uploadCount = new AtomicInteger();
AtomicBoolean cleanup = new AtomicBoolean(false);
boolean ended;
long uploadSize = 0L;
final boolean isMultipart;
final boolean isUrlEncoded;
public BHandler(RoutingContext context, long contentLength) {
this.context = context;
Set<FileUpload> fileUploads = context.fileUploads();
final String contentType = context.request().getHeader(HttpHeaders.CONTENT_TYPE);
if (contentType == null) {
isMultipart = false;
isUrlEncoded = false;
} else {
final String lowerCaseContentType = contentType.toLowerCase();
isMultipart = lowerCaseContentType.startsWith(HttpHeaderValues.MULTIPART_FORM_DATA.toString());
isUrlEncoded = lowerCaseContentType.startsWith(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
}
initBodyBuffer(contentLength);
if (isMultipart || isUrlEncoded) {
context.request().setExpectMultipart(true);
context.request().uploadHandler(upload -> {
if (bodyLimit != -1 && upload.isSizeAvailable()) {
// we can try to abort even before the upload starts
long size = uploadSize + upload.size();
if (size > bodyLimit) {
failed = true;
context.fail(413);
return;
}
}
if (handleFileUploads) {
// we actually upload to a file with a generated filename
uploadCount.incrementAndGet();
// NOTE: upload buffer stream will not be save to upload dir.
// buffer data retrieved from upload stream will be forwarded to the next in routing context.
Pump p = Pump.pump(upload, new UploadWriteStream(fileUploads, upload));
p.start();
upload.resume();
upload.exceptionHandler(t -> {
deleteFileUploads();
context.fail(t);
});
upload.endHandler(v -> uploadEnded());
}
});
}
context.request().exceptionHandler(t -> {
deleteFileUploads();
context.fail(t);
});
}
private class UploadWriteStream implements WriteStream<Buffer>
{
private Set<FileUpload> fileUploads;
private HttpServerFileUpload upload;
public UploadWriteStream(Set<FileUpload> fileUploads, HttpServerFileUpload upload)
{
this.fileUploads = fileUploads;
this.upload = upload;
}
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
public WriteStream<Buffer> write(Buffer buffer) {
byte[] data = buffer.getBytes();
// HERE!!! byte array of upload file data will be added to routing context.
MultipartFileUploadImpl fileUpload = new MultipartFileUploadImpl(null, upload, data);
fileUploads.add(fileUpload);
return this;
}
public void end() {
}
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
return this;
}
public boolean writeQueueFull() {
return false;
}
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
return this;
}
}
private void initBodyBuffer(long contentLength) {
int initialBodyBufferSize;
if(contentLength < 0) {
initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE;
}
else if(contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) {
initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES;
}
else {
initialBodyBufferSize = (int) contentLength;
}
if(bodyLimit != -1) {
initialBodyBufferSize = (int)Math.min(initialBodyBufferSize, bodyLimit);
}
this.body = Buffer.buffer(initialBodyBufferSize);
}
@Override
public void handle(Buffer buff) {
if (failed) {
return;
}
uploadSize += buff.length();
if (bodyLimit != -1 && uploadSize > bodyLimit) {
failed = true;
context.fail(413);
// enqueue a delete for the error uploads
context.vertx().runOnContext(v -> deleteFileUploads());
} else {
// multipart requests will not end up in the request body
// url encoded should also not, however jQuery by default
// post in urlencoded even if the payload is something else
if (!isMultipart /* && !isUrlEncoded */) {
body.appendBuffer(buff);
}
}
}
void uploadEnded() {
int count = uploadCount.decrementAndGet();
// only if parsing is done and count is 0 then all files have been processed
if (ended && count == 0) {
doEnd();
}
}
void end() {
// this marks the end of body parsing, calling doEnd should
// only be possible from this moment onwards
ended = true;
// only if parsing is done and count is 0 then all files have been processed
if (uploadCount.get() == 0) {
doEnd();
}
}
void doEnd() {
if (failed) {
deleteFileUploads();
return;
}
if (deleteUploadedFilesOnEnd) {
context.addBodyEndHandler(x -> deleteFileUploads());
}
HttpServerRequest req = context.request();
if (mergeFormAttributes && req.isExpectMultipart()) {
req.params().addAll(req.formAttributes());
}
context.setBody(body);
context.next();
}
private void deleteFileUploads() {
if (cleanup.compareAndSet(false, true) && handleFileUploads) {
for (FileUpload fileUpload : context.fileUploads()) {
FileSystem fileSystem = context.vertx().fileSystem();
String uploadedFileName = fileUpload.uploadedFileName();
fileSystem.exists(uploadedFileName, existResult -> {
if (existResult.failed()) {
log.warn("Could not detect if uploaded file exists, not deleting: " + uploadedFileName, existResult.cause());
} else if (existResult.result()) {
fileSystem.delete(uploadedFileName, deleteResult -> {
if (deleteResult.failed()) {
log.warn("Delete of uploaded file failed: " + uploadedFileName, deleteResult.cause());
}
});
}
});
}
}
}
}
}
...
package xxx.multipart;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.ext.web.FileUpload;
public class MultipartFileUploadImpl implements FileUpload {
private final String uploadedFileName;
private final HttpServerFileUpload upload;
private final byte[] data;
public MultipartFileUploadImpl(String uploadedFileName, HttpServerFileUpload upload, byte[] data) {
this.uploadedFileName = uploadedFileName;
this.upload = upload;
this.data = data;
}
public byte[] getData() {
return data;
}
public String name() {
return this.upload.name();
}
public String uploadedFileName() {
return this.uploadedFileName;
}
public String fileName() {
return this.upload.filename();
}
public long size() {
return this.upload.size();
}
public String contentType() {
return this.upload.contentType();
}
public String contentTransferEncoding() {
return this.upload.contentTransferEncoding();
}
public String charSet() {
return this.upload.charset();
}
}
...
package xxx.multipart;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
public interface MultipartBodyHandler extends Handler<RoutingContext> {
long DEFAULT_BODY_LIMIT = -1;
String DEFAULT_UPLOADS_DIRECTORY = "file-uploads";
boolean DEFAULT_MERGE_FORM_ATTRIBUTES = true;
boolean DEFAULT_DELETE_UPLOADED_FILES_ON_END = false;
boolean DEFAULT_PREALLOCATE_BODY_BUFFER = false;
static MultipartBodyHandler create() {
return new MultipartBodyHandlerImpl();
}
static MultipartBodyHandler create(boolean handleFileUploads) {
return new MultipartBodyHandlerImpl(handleFileUploads);
}
static MultipartBodyHandler create(String uploadDirectory) {
return new MultipartBodyHandlerImpl(uploadDirectory);
}
MultipartBodyHandler setHandleFileUploads(boolean handleFileUploads);
MultipartBodyHandler setBodyLimit(long bodyLimit);
MultipartBodyHandler setUploadsDirectory(String uploadsDirectory);
MultipartBodyHandler setMergeFormAttributes(boolean mergeFormAttributes);
MultipartBodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd);
MultipartBodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer);
}
I was using Apache JMeterCheers
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/3bace0e0-8128-4ae3-ba88-8a18f55d7678%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/8b407257-a49a-4083-a461-a835966e5f39%40googlegroups.com.
router.post().handler(ctx -> { HttpServerRequest request = ctx.request(); request.setExpectMultipart(true); request.pause(); ctx.next();// }).blockingHandler(HandlerPost::handleStaticBlocking); }).blockingHandler((ctx) -> { HttpServerRequest request = ctx.request(); HttpServerResponse response = ctx.response();
request.uploadHandler(upload -> {// Buffer buff = Buffer.buffer(); upload.handler(chunk -> {// buff.appendBuffer(chunk); }); upload.endHandler((none) -> { // everything is in buff }); });
request.endHandler(v -> { MultiMap formAttributes = request.formAttributes(); formAttributes.iterator().forEachRemaining(rec -> { // get form attributes }); if (!response.closed()) { response.end(); } }); request.resume(); });Buffer buff = Buffer.buffer();
upload.handler(chunk -> {
buff.appendBuffer(chunk);
});
upload.endHandler((none) -> {
// everything in the buffer.
});
data = buff.getBytes();
Glad that it helps. It should be much simpler now.
As mentioned in https://vertx.io/docs/apidocs/io/vertx/core/buffer/Buffer.html#buffer--
Buffer has getBytes method.
Cheers and welcome to Vertx
upload.endHandler((none) -> {
data = buff.getBytes();
});
request.uploadHandler(upload -> {
Buffer buff = Buffer.buffer();
upload.handler(chunk -> {
buff.appendBuffer(chunk);
});
upload.endHandler((none) -> {
// everything in the buffer.
data = buff.getBytes();
if(log.isDebugEnabled()) log.debug( "uploadedFileData size: [{}]", data.length);
if(data == null || data.length == 0)
{
log.warn("uploaded file data size is 0 !!!!!!!");
}
dataContentType = upload.contentType();
if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);
});
});
request.endHandler(v -> {
MultiMap formAttributes = request.formAttributes();
requestContentString = formAttributes.get("d");
if(log.isDebugEnabled()) log.debug("requestContentString: [{}]", requestContentString);
// HERE!! data is null, because upload.endHandler() called asynchronously.
// if data length is 0.
if(data.length == 0)
{
...
}
// event log disruptor translator.
...
});
Buffer buff = Buffer.buffer();
request.uploadHandler(upload -> {
upload.handler(chunk -> {
buff.appendBuffer(chunk);
});
upload.endHandler((none) -> {
// everything in the buffer.
dataContentType = upload.contentType();
if(log.isDebugEnabled()) log.debug( "dataContentType: [{}]", dataContentType);
});
});
request.endHandler(v -> {
data = buff.getBytes();
// data length is 0 !!!
}
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/44e9ace0-d94a-42a1-9347-499450b2e455%40googlegroups.com.
Hi Kidong, Glad that it helps. Vertx has peculiarities on async and callbacks took me a while too. Always welcome to have people embracing Vertx. Look forward for your benchmark on multi verticles.Cheers
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/36f6cdb5-0dc3-4de9-a084-8ddacfc0240a%40googlegroups.com.
| Collector Node | AWS EC2 Instance m5.2xlarge(8 vCPU, 32 GB Memory) |
| Network Performance | 9.6 Gbit/s |
| Collector Configuration | http.threadSize=16 http.workerPoolSize=100 http.idleTimeoutInSeconds=300 http.sendBufferSize=4096 http.receiveBufferSize=4096 http.acceptBacklog=10000 http.verticleCount=6 |
| Collector JVM Configuration | wrapper.java.additional.1=-Dorg.tanukisoftware.wrapper.WrapperStartStopApp.maxStartMainWait=300 # Initial Java Heap Size (in MB) # Maximum Java Heap Size (in MB) |
| Load Test Client | AsyncHttpClient |
Data Size | Case 1. Kafka Producing: false, Logging: false (TPS) | Case 2. Kafka Producing: true, Logging: true (TPS) |
|---|---|---|
| 1KB | 52544 | 38717 |
| 2KB | 47960 | 28140 |
| 5KB | 39028 | 22143 |
| 10KB | 31949 | 19078 |
| 20KB | 23463 | 15050 |
| 50KB | 13581 | 11053 |
| 100KB | 7947 | 6977 |
Hi Hasani,