vertx 3 - how to write to file system using rxjava or observables

882 views
Skip to first unread message

Jazz

unread,
Mar 31, 2015, 1:02:25 AM3/31/15
to ve...@googlegroups.com


Hi,

I am trying to write the content of "httpClientResponse" to the file system but I don't know how to do that using observables.

I thought of using:
ReadStream rs = httpClientResponse;
WriteStream ws = asyncFile;
Pump.pump(rs, ws);

but how can I know when it is done so I can return the final result  (to return true in this example).

I also thought of trying to use "reduce" but how can I use it with "
asyncFile.writeObservable"

I feel like I am missing something here! :)

Any help will be appreciated, thanks!




   
/**
     *
     * @param filePath
     * @return true if file is saved successfully
     */

   
public Observable<Boolean> toObservableSaveToFileSystem(String filePath) {
       
FileSystem fs = vertx.fileSystem();
       
OpenOptions openOptions = new OpenOptions();
        openOptions
.setCreateNew(true);
        openOptions
.setWrite(true);
       
       
return fs.openObservable(filePath, openOptions)
       
.flatMap(asyncFile -> {

           
           
return httpClientResponse.toObservable()
           
.reduce(Buffer.buffer(), (b1,b2) -> {
               
                asyncFile
.writeObservable(b1, 0);
               
               
return b2;
               
           
})

           
           
.flatMap(buf -> {

               
return asyncFile.closeObservable()
               
.flatMap(v -> {
                   
return Observable.just(true);
               
});

           
});

           
           
       
});
   
}



 


Julien Viet

unread,
Mar 31, 2015, 2:48:59 AM3/31/15
to ve...@googlegroups.com, Jazz
hi,

you cannot use Pump with RxJava because RxJava has a different control flow system that Vert.x and we don’t integrate RxJava with Vert.x control flow.

The normal way to do, would be to transform the WriteStream into a Subscriber<Buffer> and subscribe it to the Observable<Buffer> and let RxJava do the job.

I do have code for this locally that does add a toSubscriber() to any WriteStream (like Observable with ReadStream) but that is not yet finalised.

You can do it and create a static method that transforms the WriteStream into a Subscriber:

public <T> Subscriber<T> toSubscriber(WriteStream<T> ws) {
   return new Subscriber<T>() {
      public void onNext(T next) {
         ws.write(next);
      }
      etc...
   }
}

Keep in mind you cannot end the WriteStream in this generic way because it does not have an end() method and you need to take care of it outside of this.
-- 
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages