loop with RxJava

3,939 views
Skip to first unread message

uppsax

unread,
Oct 28, 2013, 1:59:47 PM10/28/13
to ve...@googlegroups.com
Hi

Could you explain how to implement loop with Rx, please?
I was wondering if there is a standard way to do it.

What I want to do is the equivalent of

foreach (each : list) {
     func();
}

and 

while (condition) {
    func();
}

where func() is Func1.

Thank you in advance.
-uppsax

uppsax

unread,
Oct 28, 2013, 7:06:09 PM10/28/13
to ve...@googlegroups.com
Hi

To add to my previous question, here is my own implementation to achieve a loop that calls Func1.
Well, with some problem... so I am hoping someone can answer this for me as well, thanks.

                                        ...
return Observable.from(list)
.reduce(new Func2<String, String, String>() {
@Override
public String call(String arg0, String arg1) {

                                                        // (A)
return Observable.just(arg0)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String arg0) {
return Observable.just(arg0);
}
});    // (C)
// (B)
// return arg0 + arg1;
}
})
                                        ...

Each iteration for reduce(), you have to return String.  Therefore, (B) works (without (A)).
What I am trying to do with (A) instead of (B) is to try to call other Func1 within reduce().
The problem is, I cannot return String after Func1, which is a requirement of reduce().
Is there such thing as convert Observable<String> to String?
What function can I append to (C) to get a String, so that reduce() can get that String?
Or, forget above all, how can I iterate a list, during which other Func1 can be called?

Thank you.
-uppsax


petermd

unread,
Oct 29, 2013, 8:18:38 AM10/29/13
to ve...@googlegroups.com
Hi Uppsax,

usually a Func1 is used to map a value rather then just call a procedure with no side-effects. do you need to do anything with the result of func() or do you just need to wait for all the calls to complete?

Observable.toObservable(list).map(func).toList()

would return an Observable<List> that would call func() for each element (assuming func() returned the input argument if it didnt need to change it). Suspect there's more to it than this though?

-Peter

uppsax

unread,
Oct 29, 2013, 9:04:44 AM10/29/13
to ve...@googlegroups.com
Thank you Peter

> Observable.toObservable(list).map(func).toList()

I will try this today, thanks.  Let me allow to explain a bit where I am coming from...

Imagine that there is a totally procedural code and you have to translate it using asynchronous redis/db APIs... this is, sadly, what I have been doing.  I wish I could write code with Rx from scratch.  Therefore, all subroutines are Func1s that get called using flatMap.  Like you suggested to me several months ago, I can work with 'if and else' using flatMap/Func1, thank you.

The final piece of puzzle is loop. The functions called within the loop can be a bit complex, including updating variables outside the loop and all that.  However, I want to first set up a simplest form of loop using Rx.  After a struggle, the code above with reduce() is my first attempt.  So,

Q1) Do you have an idea to make my code work?  If I could just feed String to reduce() rather than Observable<String>...

Q2) In normal situation, you can use all sorts of fabulous-ness of Rx, I know that. But with my abnormal situation, how would you implement the dead simple for-loop or while-loop within which a function is called?

Thanks, Peter
-uppsax

petermd

unread,
Oct 29, 2013, 6:39:53 PM10/29/13
to ve...@googlegroups.com
Hi Uppsax,

sounds like an interesting project!

flatMap is only needed if the result of the operation are possibly async (like a redis / db call) - if you already have a list to transform then i don't think you need to rx'ify the list processing, you can just use for..each as normal, unless you wanted each element to be used to make a seperate async call?

wrt how you handle the response, do you need to combine all the results or do you just need to ensure that all of them have completed? you can just subscribe to the onComplete notification from the flatMap and it will fire when all the requests have completed (there's no need to handle the individual responses in an onNext handler)

perhaps a more detailed gist would help?

-Peter

uppsax

unread,
Oct 30, 2013, 8:02:22 AM10/30/13
to ve...@googlegroups.com
Hi Peter

> flatMap is only needed if the result of the operation are possibly async (like a redis / db call) - if you already have a list to transform then i don't think you need to rx'ify the list processing, you can just use for..each as normal, unless you wanted each element to be used to make a seperate async call? 

In this original procedural code, redis/DB calls are scattered everywhere, in every depth level of function call.  Therefore, for every function call I have to use flatMap, unless I am definitely sure that this leaf function is not going to call any async calls in the future... in which case I use normal Java for loop.

I am trying Observable.from(list).map().toList() at the moment.

Q1) Is this map() guaranteed that each iteration is done sequentially, meaning that one is processed and then the next one is processed, in the order of the original list?
The Rxjava manual looks like it, but I want to ensure that this map behaves exactly the same as the traditional for loop.
By the way, from within map() Func1 will be called (therefore many async calls will be called) many times.

Q2) Is your answer for Q1 the same for Observable.from(list).flatMap() ?


Thanks Peter

petermd

unread,
Oct 30, 2013, 9:53:02 AM10/30/13
to ve...@googlegroups.com
Hi Uppsax,

You'd need to use flatMap() rather than map() if you are issuing lots of async calls.

It also won't be done serially (ie the completed async response for one func will be received before the next request is initiated) - is that the behavior you need? It's a little more complex and you may have some issues with the RxEventBus api (the Observables returned by send() don't support execute on subscribe which would be required for eg concat() to work) - its should be straightforward to add support though.

Regards,
Peter

Ryu Sasai

unread,
Oct 30, 2013, 11:07:08 AM10/30/13
to ve...@googlegroups.com
Hi Peter

Yes, that's what I need.  There are cases where an iteration requires the result of the last iteration.  It needs to be serial like other sequential loops.

I was about to try to call another Func1 from onNext, but you are saying that that does not work, is that right?
It would be great to so because that will solve my problems.

Also I am using flatMap rather than map.

Here is my pseudo code so far...

Basically for each iteration (for each element of list_wrap below), there will be losts of nested redis async calls using flatMap()s.


...

return Observable.from(list_wrap)
    .flatMap(new Func1<xx , xx>() {
                 public xx call(xx arg0) {
                     ...
                     return xx
                         .flatMap(new Func1<xx, xx>() {
                                          public xx call(xx args) {
                                                  return xx;
                                           }

...

.subscribe(
new Action1<xx>() {
public void call(xx args) {
System.out.println( "onNext");
                                /*
                                 I want to do something here before the next iteration
                                 */
}
},
new Action1<java.lang.Exception>() {
                      ...
}
},
new Action0() {
public void call() {
System.out.println( "onComplete: loop completed" );
}
});





--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/TRjz6icpuPk/unsubscribe.
To unsubscribe from this group and all of its topics, send an email to vertx+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

uppsax

unread,
Nov 4, 2013, 10:27:05 AM11/4/13
to ve...@googlegroups.com
Hi Peter

> (the Observables returned by send() don't support execute on subscribe which would be required for eg concat() to work) - its should be straightforward to add support though.

Could I please ask you to implement relatively soon?
This would be brilliant...

-uppsax

uppsax

unread,
Dec 2, 2013, 7:04:54 AM12/2/13
to ve...@googlegroups.com
Hi Peter

In case you haven't read the previous post, please allow me to ask you this again...

Could you please implement concat()?  It would be greater than great for me.

Thank you very much.

-Uppsax

petermd

unread,
Dec 2, 2013, 10:11:05 AM12/2/13
to ve...@googlegroups.com
Hi Uppsax,

Sorry for not replying to your post. I've made the changes to the EventBus to support this, so you can now use RxEventBus.observeSend()/observeSendWithTimeout to create an Observable that only sends the message on subscribe.

https://github.com/vert-x/mod-rxvertx/blob/master/src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java

I've also updated the UnitTest to verify this works as intended using concat()

https://github.com/vert-x/mod-rxvertx/blob/master/src/test/java/io/vertx/rxcore/test/integration/java/EventBusIntegrationTest.java#L58

Would be great to get your feedback on the changes

-Peter

uppsax

unread,
Dec 3, 2013, 4:43:51 AM12/3/13
to ve...@googlegroups.com
Hi Peter

That is brilliant.  I will try that and let you know how it goes.

Thank you very much.
-Uppsax
Reply all
Reply to author
Forward
0 new messages