HttpClientOptions options = new HttpClientOptions()
.setDefaultHost(resolvedUrl.getHost())
.setDefaultPort(resolvedUrl.getPort());
HttpClient client = vertx.createHttpClient(options);
HttpClientRequest httpClientRequest = client.get(resolvedUrl.getFile(), getHttpClientResponseHandler(asyncFile, filePath, message));
httpClientRequest.exceptionHandler(getExceptionHandler(url, message, asyncFile));
httpClientRequest.end();
private Handler<HttpClientResponse> getHttpClientResponseHandler(final AsyncFile asyncFile, final String filePath, Message<String> message) {
return httpClientResponse -> {
int statusCode = httpClientResponse.statusCode();
httpClientResponse.pause();
if (statusCode != 200) {
String errorMessage = String.format("Could not download file. Status code was not 200, got: %s", statusCode);
logger.error(errorMessage);
message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_NOT_200.getCode(), errorMessage);
asyncFile.close();
return;
}
httpClientResponse.endHandler(httpEndHandler -> {
asyncFile.flush(flushResp -> {
asyncFile.close(event -> {
if (event.succeeded()) {
logger.info(String.format("Closed file: %s", filePath));
message.reply(filePath);
} else {
logger.info(String.format("Could not close file: %s", filePath));
}
});
});
});
Pump pump = Pump.factory.pump(httpClientResponse, asyncFile);
httpClientResponse.resume();
pump.start();
};
--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/N_wSoQlvMMs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/004f19bf-74f0-4891-867e-e9e8d37a5231%40googlegroups.com.
is there nobody who downloads a file via http using a pump with vertx 3 ? :)
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/98d0ce17-0ee5-4d0d-af20-5a8a4af84313%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/b92727a3-2388-4d21-bdf4-e0bdd92d8071%40googlegroups.com.
Greets Joan
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/106b7199-5590-440f-849c-9e3366cd0eba%40googlegroups.com.
Hey Tim, thanks for your help!
As i wrote before, you need to adjust the variable "url" in the class "DownloadVerticle" to point to a url served by nginx.ÂAnd you need to adjust the  variable "filePath" also in the class "DownloadVerticle" (a path where the file should be stored).
Then compile the code with:mvn clean install
Then start the appvertx run com.movingimage24.MainVerticle -cp vertx-pump-issue-1.0-SNAPSHOT-fat.jar
If you cant find it, search for:final String filePath = File.separator + "opt/vertx-module" + File.separator + UUID.randomUUID().toString();Â
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/77f995f7-fa11-4169-be25-6b29690889ad%40googlegroups.com.
yes, sorry...we deploy a jar to vertx without the vertx dependencies inside the jar. its necessary for our build system here...
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/25020f11-5975-4327-b422-3473826e51a8%40googlegroups.com.
Hey Tim, thanks for your help!
As i wrote before, you need to adjust the variable "url" in the class "DownloadVerticle" to point to a url served by nginx.ÂAnd you need to adjust the  variable "filePath" also in the class "DownloadVerticle" (a path where the file should be stored).
Then compile the code with:mvn clean install
Then start the appvertx run com.movingimage24.MainVerticle -cp vertx-pump-issue-1.0-SNAPSHOT-fat.jar
If you cant find it, search for:final String filePath = File.separator + "opt/vertx-module" + File.separator + UUID.randomUUID().toString();Â
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/77f995f7-fa11-4169-be25-6b29690889ad%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/9c5f91c6-5605-478c-b13f-3de6d95a3bd4%40googlegroups.com.
Hey tim!As you can see in this post, Nat told me that the pump does that.
it also fails when i closed the file.pause was in vertx 2.X necessary so i dont start reading bytes before i registered my handlers.
yes i can create and open the file with one method call, but that should lead to a corrupt file.
yes, parsing the url is a bit complex, because the code is from a more complex application and that should also not lead to a corrupt file
Greets from berlin,Joan
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/2c920f53-353b-4f77-a714-67d53446c438%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/036b67ad-634b-4555-b3d2-a64a0ae1b559%40googlegroups.com.
I mean pause/resume done internally by Pump. Not the one Joan has in the code. Also based on how the code looks like, it is also possible that endHandler will be called before all the buffers are consumed.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/8fa6abd8-6e92-48ee-8f26-44e8058e9b23%40googlegroups.com.
private void doResume() {
if (pausedChunks != null) {
Buffer chunk;
while ((chunk = pausedChunks.poll()) != null) {
final Buffer theChunk = chunk;
vertx.runOnContext(new VoidHandler() {
@Override
protected void handle() {
handleChunk(theChunk);
}
});
}
}
if (hasPausedEnd) {
final LastHttpContent theTrailer = pausedTrailer;
vertx.runOnContext(new VoidHandler() {
@Override
protected void handle() {
handleEnd(theTrailer);
}
});
hasPausedEnd = false;
pausedTrailer = null;
}
}
I refer to the code here:
private void doResume() { if (pausedChunks != null) { Buffer chunk; while ((chunk = pausedChunks.poll()) != null) { final Buffer theChunk = chunk; vertx.runOnContext(new VoidHandler() { @Override protected void handle() { handleChunk(theChunk); } }); } } if (hasPausedEnd) { final LastHttpContent theTrailer = pausedTrailer; vertx.runOnContext(new VoidHandler() { @Override protected void handle() { handleEnd(theTrailer); } }); hasPausedEnd = false; pausedTrailer = null; } }
all the chunks are enqueued to be called asynchronously. During that period, autoRead has been set to true. When the task time quota has run out, Netty will steal some CPU cycles to process IO event and it might read LastHttpContent and schedule the call for handleEnd(). However, during that time period, some chunks filling up the write queue (in AsyncFile) again and get enqueued into pauseChunks. Therefore, it could be that handleEnd() is called before all the chunks were drained.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/1355b72f-65e9-4e3a-8800-2413ebe56033%40googlegroups.com.
Thanks for your help!
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/284366de-322a-45e7-ada3-12d5bff14a89%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/9e0b1a6a-3a11-4ca2-984f-f8a48250bc98%40googlegroups.com.
Here is a link to dropbox, which contains the original file and the file downloaded (corrupted one) with the code from github.Greets Joan
i dont know what i can do else.
if you using nginx + using my code on github + the original file i uploaded to dropbox, you cant reproduce it ?what can i do else, i dont get it.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/4b1dd5cb-69e7-43a3-8a6d-b74cba34ea5c%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/3e1ca0a5-f67e-415d-8cc6-4a72e046174d%40googlegroups.com.
private void doResume() {
vertx.runOnContext(new VoidHandler() {
if (pausedChunks != null) {
Buffer chunk;
while ((chunk = pausedChunks.poll()) != null) {
final Buffer theChunk = chunk;
handleChunk(theChunk);
if (paused) {
 break;
}}
}if (!paused && hasPausedEnd) {
final LastHttpContent theTrailer = pausedTrailer;
handleEnd(theTrailer);
hasPausedEnd = false;
pausedTrailer = null;
}
});
}
private void doResume() { vertx.runOnContext(new VoidHandler() {if (pausedChunks != null) { Buffer chunk; while ((chunk = pausedChunks.poll()) != null) { final Buffer theChunk = chunk; handleChunk(theChunk);if (paused) { break;}}}if (!paused && hasPausedEnd) {final LastHttpContent theTrailer = pausedTrailer; handleEnd(theTrailer); hasPausedEnd = false; pausedTrailer = null;}}); }
Tim,
If the code looks something like above, it should address the issue.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/4013a707-6a92-4029-8fe3-fc29687527bc%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/71c58ebe-748e-440d-a616-2c583d4a5241%40googlegroups.com.
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import java.util.Arrays;
import java.util.Random;
public class Main {
private static Vertx vertx;
private static Buffer buffer;
private static Buffer readBuffer;
public static void main(String[] args) {
VertxOptions o = new VertxOptions();
vertx = Vertx.vertx(o);
byte[] data = new byte[64 * 1024 * 1024];
new Random().nextBytes(data);
buffer = Buffer.buffer(data);
readBuffer = Buffer.buffer(64 * 1024 * 1024);
HttpServer httpServer = vertx.createHttpServer();
httpServer.requestHandler(Main::handleWrite);
httpServer.listen(10000);
HttpClient httpClient = vertx.createHttpClient();
HttpClientRequest clientRequest = httpClient.get(10000, "localhost", "/");
clientRequest.handler(x -> {
x.handler(y -> handleRead(x, y));
x.endHandler(Main::handleDone);
});
clientRequest.end();
}
private static void handleDone(Void aVoid) {
byte[] expectedData = buffer.getBytes();
byte[] actualData = readBuffer.getBytes();
System.out.println(Arrays.equals(expectedData, actualData));
}
private static void handleWrite(HttpServerRequest request) {
request.response().setChunked(true);
for (int i = 0; i < buffer.length() / 8192; i++) {
request.response().write(buffer.slice(i * 8192, (i + 1) * 8192));
}
request.response().end();
}
private static void handleRead(HttpClientResponse response, Buffer buffer) {
readBuffer.appendBuffer(buffer);
for (int i = 0; i < 64; i++) {
vertx.setTimer(1, n -> {
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
response.pause();
vertx.setTimer(1, n -> response.resume());
}
}
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/2377fada-13ac-4306-95ac-7f65298db846%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/69f6dd8f-9a55-4897-af8c-21179a472785%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/52e0e04a-17f2-483f-b763-e016c16b9896%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/4a094030-1dec-4db7-92fe-d8868c81bbd2%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/edf6d30a-a8e0-4b52-8499-c6b1fe855619%40googlegroups.com.
...
If the current executed task which calls into HttpClientResponseImpl.resume is the 64th task that got executed in the NioEventLoop and those 64 tasks have taken longer than the previous IO call, it will break out of line 363 and it will exit from runAllTasks(long) method and cause the IO selector epoll to be executed again. In which case, it will read the latest data before the old chunk get read by handleChunk() method.
Order of execution1. HttpClientResponse.resume() is called.2. HttpClientResponse.doResume() is called.3. handleChunk is queued to be executed in the next event loop
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/6a0b3311-e3d3-4f48-a172-079e3ce73853%40googlegroups.com.
On 16/08/15 23:30, Nat wrote:
If the current executed task which calls into HttpClientResponseImpl.resume is the 64th task that got executed in the NioEventLoop and those 64 tasks have taken longer than the previous IO call, it will break out of line 363 and it will exit from runAllTasks (long) method and cause the IO selector epoll to be executed again. In which case, it will read the latest data before the old chunk get read by handleChunk () method.
Order of execution1. HttpClientResponse.resume () is called.
2. HttpClientResponse.doResume () is called.
3. handleChunk is queued to be executed in the next event loop
WE only do this if hasPausedEnd = true, and this is only true if the Last chunk of the Response has Been Received. In normal PAUSE / resume before the End of the Response WE do not Call runOnContext () at any More All.
4. ClientConnection.doResume () is called and it setup the selector for this connection
4. runAllTasks got a break because it is the 64th task and so far tasks have taken a significant amount of time
5. NioEventLoop.run () will continue the next loop at line 3046. NioEventLoop. processSelectedKeys Will be Called and it Will Read the Next chunk of Data from ClientConnection7. NioEventLoop. RunAllTasks () will be called again.
8. At this point, the previous scheduled task to call handleChunk with previous paused chunks will be called
And there you go, you get yourself a re-ordered buffer.
On Sunday, August 16, 2015 at 12:51:21 AM UTC-7, Tim Fox wrote:
Could you Elaborate a bit? On 16/08/15 6:10, Nat wrote:
Based on the fix, I think it might still cause a problem in a rare occurrence where IO task is executed before the scheduled job. The right fix is ​​not to turn autoRead back on unless all the paused chunks are drained.
public synchronized HttpClientResponse resume () {
if (paused) {
paused = false ;
VertX . runOnContext ( new VoidHandler () {doResume ();ʱ??ʱ??ʱ??
public void doResume () {if (pausedChunks = null!) {Buffer chunk; while ((Chunk = pausedChunks.poll ())! = Null) {final Buffer theChunk = chunk; handleChunk (theChunk); if (Paused) {return;}}}conn.doResume (); if (hasPausedEnd) {final LastHttpContent theTrailer = pausedTrailer; handleEnd (theTrailer); hasPausedEnd = false; pausedTrailer = null;}ʱ??
On Saturday, August 15, 2015 at 1:43:10 AM UTC-7, Tim Fox wrote:
That I do not know about, BUT you CAN See That IO Events are * Always * Executed before Waiting tasks here: https://github.com/netty/ Netty / BLOB / 4.0 / Transport / src / main / Java / IO / Netty / Channel / nio / NioEventLoop.java # L349 https://github.com/netty/ Netty / BLOB / 4.0 / Transport / src / main / Java / IO / Netty / Channel / nio / NioEventLoop.java # L354 Ie processSelectedKeys () is Always Executed before runAllTasks () To make Matters worse, if WE Call: channel.setAutoRead (false); There is no Guarantee That some IO Events Will not Come in afterwards - the semantics SEEM to Mean "some time shortly after calling, no More IO Events Will be Received. " All of this Combined Makes it Hard for US to Guarantee chunks are delivered to the user in the correct Order, When WE Use vertx.runOnContext () to "Replay" Events. Anyway, to cut a long story short, I've fixed it locally by concatenating paused chunks into a larger chunk which is delivered when the next chunk (or the last one) after the response has been resumed is delivered. That way we can avoid vertx. runOnContext (). There are a couple of other places in the code That SEEM to suffer from the Same issue (eg in NetSocketImpl and Possibly ServerConnection) so I need to FIX Those TOO. On 14/08/15 16:27, Nat wrote ʱ??
Correct. If there are more than 64 tasks sitting in the queue and it takes more than 1 ms by default, it will stop running the task and try to process IO instead. I think it's probably there to ensure that IO tasks has a chance to . Drain the queues On Friday, August 14, 2015 at 7:35:26 AM UTC-7, Tim Fox wrote:
I'm looking at it now. The problem Seems to be That Netty Always prioritises IO Events over other Events (SUCH Events as submitted using vertx.runOnContext ()), Which CAN result in chunks being reordered in the Case of PAUSE / resume. On 14/08/15 15:13, Nat wrote:
Tim,
? Did you get a Chance to Take a look On Thursday, August 13, 2015 at 11:11:59 AM UTC-7, Nat wrote:
What do you Mean? On Thursday, August 13, 2015 at 11:09:55 AM UTC-7, Tim Fox wrote:
Thanks, I ASSUMED you and Joan Were running in verticles, if you're running this Embedded Makes a Lot More Sense. On 13/08/15 6:46 p.m., Nat wrote:
As Joan is probably sleeping, I managed to find a nice short repro.
Import io.vertx.core.Vertx ; Import io.vertx.core.VertxOptions ; Import io.vertx.core.buffer.Buffer ; Import io.vertx.core.http *. ; Import java.util.Arrays ; Import Java. util.Random ; public class Main { Private static VertX VertX ; Private static Buffer Buffer ; Private static Buffer ReadBuffer ; public static void main (String [] args) { VertxOptions O = new VertxOptions () ; VertX = VertX. VertX (O) ; byte [] Data = new byte [ 64 * 1024 * 1024 ] ; new Random () nextBytes (Data). ; Buffer . = Buffer Buffer (Data) ; ReadBuffer = Buffer. Buffer ( 64 * 1024 * 1024 ) ; HttpServer HTTPServer = VertX .createHttpServer () ; httpServer.requestHandler ( Main :: handleWrite ) ; httpServer.listen ( ten thousand ) ; HttpClient httpClient = VertX .createHttpClient () ; HttpClientRequest ClientRequest = httpClient.get ( 10 thousand , "localhost" , "/" ) ; clientRequest.handler (x -> { x.handler (y -> handleRead (x , y)) ; x.endHandler (Main :: handleDone ) ; }) ; clientRequest.end () ; } Private static void handleDone (Void Avoid) { byte [] expectedData = Buffer .getBytes () ; byte [] actualData = ReadBuffer .getBytes () ; System. out .println (Arrays. EQUA LS (expectedData , actualData)) ; } Private static void handleWrite (HttpServerRequest Request) { request.response () setChunked (. true ) ; for ( int i = 0 ; i < Buffer .length () / 8192 ; i ++) { request.response () write (. buffe r .slice (i * 8192 , (i + 1 ) * 8192 )) ; } request.response () End (). ; } Private static void handleRead (HttpClientResponse Response , Buffer Buffer) { ReadBuffer .appendBuffer ( Buffer) ; for ( int i = 0 ; i < 64 ; i ++) { VertX .setTimer ( 1 , N -> { try { Thread. sleep ( 0 ) ; } catch (InterruptedException e) { e.printStackTrace () ; } }) ; } response.pause () ; VertX .setTimer ( 1 , N -> response.resume ()) ; } ʱ??
On Wednesday, August 12, 2015 at 9:59:47 AM UTC-7, Tim Fox wrote:
Please Bear with Me. It's Painful debugging remotely this BUT WE need to go through this Process to isolate the issue.. On 12/08/15 17:42, Nat wrote:
:-) I think Alex's result seems to indicate that the packets were shuffled.
On Wednesday, August 12, 2015 at 9:04:44 AM UTC-7, Tim Fox wrote:
Ping :) On 11/08/15 15:17, Tim Fox wrote:
I've added some diagnostic information in vert.x Which Will Tell if packets are getting unordered. Please Build vert.x from this Branch https://github.com/eclipse/ vert.x / Tree / httpclientresponsepump And run your reproducer Again ... It Will Produce some output on stdout That Please post here . Thanks On 11/08/15 08:44, Joan R. wrote:
To Nat: If i use 1024 for setWriteQueueMaxSize, its (not surprisingly) also corrupt.ÂTo be honest, i dont have a good diff tool for mac.ÂSo i cant tell you if the file created with a setWriteQueueMaxSize of 1024 is more corrupt then the file with the default settings.Perhaps Alexander Lehmann can have a look to these two files.
I Uploaded THESE Corrupt Files into the Dropbox .
If anybody has a good binary diff tool, youre welcome. I use this
ʱ??
Hey guys!
Again my question, when will this fix be available ?
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/977837ba-4702-4ad1-9198-ad54c43d623d%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/d889d145-954d-4713-8f0d-87b5c2b99b96%40googlegroups.com.
It's already fixed in the 2.x branch and will be in the next 2.x release (2.1.7)
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/ea103f6a-392a-4d0f-8c49-cb19b152ab11%40googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/cd5fde24-472f-4be3-990f-dee0e2f051d6%40googlegroups.com.