RxJava: Observable.create(...)

539 views
Skip to first unread message

Grebiel Jose Ifill Brito

unread,
Dec 14, 2016, 1:10:55 PM12/14/16
to RxJava
Hello,

I have been taking a look at the different ways that are available in RxJava to create rx.Observable objects and I notice a warning “advanced use only” on the method “Observable.create(...)https://github.com/ReactiveX/RxJava/wiki/Creating-Observables.

I then followed the link http://reactivex.io/documentation/operators/create.html and I found this information:
“A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other methods.”

I would like to know if there is something special that developers should consider when using the “create” method? besides not calling onCompleted and onError multiple times and making sure that the communication between the observable (rx.Observable) and the observer (rx.Subscriber) is thread safe.

The communication that I mentioned will be in one direction only, from observable to observer. I’m simply passing an object to the observer using the onNext method. Since the object has many fields, I’m using i.e. AtomicInteger data types or locks to make the read write operations thread safe.

I already have an implementation and it seems to work fine, but I wanted to make sure that I’m not missing an important aspect that should be considered when using the “Observable.create(...)” method.

Thanks in advanced.

Kind Regards,
Grebiel

Dávid Karnok

unread,
Dec 14, 2016, 1:22:48 PM12/14/16
to Grebiel Jose Ifill Brito, RxJava
There is a wiki page about creating custom Observables and operators:

--
Best regards,
David Karnok

Grebiel Jose Ifill Brito

unread,
Dec 15, 2016, 7:20:11 AM12/15/16
to RxJava, gre...@gmail.com
Thanks a lot for the link. I found two things that I hadn't considered. I wasn't aware of Producers and I was not using "onBackpressureBuffer()". The reason why I asked is because I'm developing a refactoring tool for my thesis that converts the javax.swing.SwingWorker async constructs into RxJava. Basically I would like to have an observable that executes the async operation and an observer (rx.Subscriber) that reacts to the publish(...) and setProgress(...) calls in the observable. Additionally the observer should be able to perform an operation once the async operation has completed. The idea is that the code after refactoring looks pretty much the same as the code before refactoring. The advantage of doing the refactoring would be to enable all RxJava operators that not are available in SwingWorkers.

Below there is an image where you can see how the code would look like before (left side) and after (right side) refactoring. After the image you'll find the source code of the three classes that I'm using to achieve this. Could you take a quick look at this and tell me if it makes any sense to you? in other words, whether I'm going in the right direction or if I'm misusing RxJava?

I have compare the console output for this particular example from the SwingWorker and the RxJava implementation and they match.



Class used to create the observable:
public abstract class SwingWorkerRxOnSubscribe<ReturnType, ProcessType> 
       implements Observable.OnSubscribe<SwingWorkerDto<ReturnType, ProcessType>>,
                 
Producer
{
   
private Subscriber<? super SwingWorkerDto<ReturnType, ProcessType>> observer;
   
private SwingWorkerDto<ReturnType, ProcessType> dto;
   
private long requestCount;

   
/**
    * Manages the workflow of a SwingWorker by setting up the data
    * transfer object {@link SwingWorkerDto <ReturnType, ProcessType>}
    * that is used for sending progress, chunks of data and finally the async result
    * to the observer. This class should not be overridden by subclasses. Therefore
    * is is marked as {@link Deprecated}
    *
    * @param observer
    *            observer
    */
   @Override
   @Deprecated
   public void call( Subscriber<? super SwingWorkerDto<ReturnType, ProcessType>> observer )
   
{
     
this.observer = observer;
     
try
      {
         
if ( !this.observer.isUnsubscribed() )
         
{
            observer
.setProducer( this );
           
this.dto = new SwingWorkerDto<ReturnType, ProcessType>();
           
this.observer.onStart();
           
ReturnType asyncResult = doInBackground();
           
this.observer.onNext( this.dto.setResult( asyncResult ) );
           
this.observer.onCompleted();
         
}
     
}
     
catch ( Exception throwable )
     
{
         observer
.onError( throwable );
     
}
   
}

   
/**
    * The maximum number of items that are produced can be for example influenced by
    * calling the method {@link Observable#take(int)} on the observable.
    * {@link this#requestCount} is being used to store the number of items being requested
    * so that only the methods {@link this#publish(Object[])} and {@link this#setProgress(int)}
    * are ignore in case that n is smaller than the number total number of items.<br>
    * {@link this#requestCount} is used to guarantee that the
    * final {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)} is always called
    *
    * @param n
    *            maximum number of items
    */
   @Override
   public void request( long n )
   
{
     
requestCount = n;
   
}

   
/**
    * To be implemented by subclasses. This method corresponds to
    * the {@link SwingWorker#doInBackground()} method.
    *
    * @return the result of the asynchronous operation
    * @throws Exception
    *             a general exception that could be thrown
    */
   protected abstract ReturnType doInBackground() throws Exception;

   
/**
    * Sends chunks of data to the observer ({@link SwingWorkerSubscriber}).
    *
    * @param chunks
    *            the data to be send
    */
   protected void publish( ProcessType... chunks )
   
{
     
if ( requestCount > 1 )
     
{
         
requestCount--;
         
this.observer.onNext( this.dto.send( chunks ) );
     
}
   
}

   
/**
    * Sends the progress to the observer ({@link SwingWorkerSubscriber}.
    *
    * @param progress
    *            progress to be sent
    */
   protected void setProgress( int progress )
   
{
     
if ( requestCount > 1 )
     
{
         
requestCount--;
         
this.observer.onNext( this.dto.setProgress( progress ) );
     
}
   
}

Subscriber used to imitate the behavior of the SwingWorker:
public abstract class SwingWorkerSubscriber<ResultType, ProcessType>
       
extends Subscriber<SwingWorkerDto<ResultType, ProcessType>>
       
implements RxSwingWorkerAPI<ResultType>
{

   
private Observable<SwingWorkerDto<ResultType, ProcessType>> observable;
   
private Subscription subscription;
   
private ResultType asyncResult;
   
private PropertyChangeSupport propertyChangeSupport;
   
private AtomicInteger progress;
   
private AtomicBoolean cancelled;
   
private AtomicBoolean done;
   
private SwingWorker.StateValue currentState;
   
private CountDownLatch countDownLatch;

   
/**
    * Constructor: each observer has a reference to the observable object
    *
    * @param observable
    */
   public SwingWorkerSubscriber( Observable<SwingWorkerDto<ResultType, ProcessType>> observable )
   
{
     
this.observable = observable;
     
this.propertyChangeSupport = new PropertyChangeSupport( this );
     
this.currentState = SwingWorker.StateValue.PENDING;
      initializeStates
();
   
}

   
/**
    * Initializes the state of the asynchronous task every time
    * that this observer is subscribed.
    * <ol>
    * <li>progress = 0</li>
    * <li>cancelled = false</li>
    * <li>done = false</li>
    * <li>state = {@link SwingWorker.StateValue#STARTED}</li>
    * </ol>
    * <p>
    * The {@link CountDownLatch}
    * is initially set to one. When {@link this#onCompleted()} is
    * invoked the {@link CountDownLatch} is set to 0 to indicate
    * that a result is present. This is important for {@link this#get()}
    * and {@link this#get(long, TimeUnit)}
    */
   @Override
   public void onStart()
   
{
      initializeStates
();
     
this.countDownLatch = new CountDownLatch( 1 );
      setState
( SwingWorker.StateValue.STARTED );
   
}

   
/**
    * Used to process that items after a
    * {@link SwingWorker#publish(Object[])} has been invoked. This
    * class is not supposed to be overridden by subclasses. Therefore
    * it was set as {@link Deprecated}. Use {@link this#process(List)}
    * to process the items sent after by
    * {@link SwingWorkerRxOnSubscribe#publish(Object[])}
    *
    * @param dto
    *            Data transfer object for chunks and asyncResult
    */
   @Override
   @Deprecated
   public void onNext( SwingWorkerDto<ResultType, ProcessType> dto )
   
{
     
asyncResult = dto.getResult();
     
if ( dto.isProgressValueAvailable() )
     
{
         setProgress
( dto.getProgressAndReset() );
     
}
     
List<ProcessType> processedChunks = dto.getChunks();
      process
( processedChunks );
      dto
.removeChunks( processedChunks );
   
}

   
/**
    * Updates the {@link CountDownLatch} setting it to 0. Calls
    * {@link this#done(Object)} and set the state to
    * {@link SwingWorker.StateValue#DONE}. This class should be
    * overridden by subclasses. Use {@link this#done(Object)} instead
    */
   @Override
   @Deprecated
   public void onCompleted()
   
{
     
countDownLatch.countDown();
     
done( asyncResult );
      setState
( SwingWorker.StateValue.DONE );

   
}

   
/**
    * Executes the asynchronous task in {@link Schedulers#computation()}
    * as long as the {@link this#observable} is not subscribed. Otherwise the
    * invocation to this method is ignored. The operation is
    * observed in {@link SwingScheduler#getInstance()}
    */
   @Override
   public void execute()
   
{
     
if ( !isSubscribed() )
     
{
         
Scheduler scheduler = Schedulers.computation();
         subscribeObservable
( scheduler );
     
}
   
}

   
/**
    * Executes the asynchronous task in {@link Schedulers#immediate()}
    * (The current thread) as long as the {@link this#observable}
    * is not subscribed. Otherwise the invocation to this method is ignored.
    * The operation is observed in {@link SwingScheduler#getInstance()}
    */
   @Override
   public void run()
   
{
     
if ( !isSubscribed() )
     
{
         
Scheduler scheduler = Schedulers.immediate();
         subscribeObservable
( scheduler );
     
}
   
}

   
/**
    * Waits from the result of {@link SwingWorkerRxOnSubscribe#doInBackground()}
    *
    * @return return value of {@link SwingWorkerRxOnSubscribe#doInBackground()}
    * @throws InterruptedException
    *             if the thread is interrupted while waiting
    */
   @Override
   public ResultType get() throws InterruptedException
   
{
     
countDownLatch.await();
     
return asyncResult;
   
}

   
/**
    * Waits from the result of {@link SwingWorkerRxOnSubscribe#doInBackground()}
    * given a timeout and a unit
    *
    * @param timeout
    *            timeout
    * @param unit
    *            time unit
    * @return return value of {@link SwingWorkerRxOnSubscribe#doInBackground()}
    * @throws InterruptedException
    *             if the thread is interrupted while waiting
    */
   @Override
   public ResultType get( long timeout, TimeUnit unit ) throws InterruptedException
   
{
     
countDownLatch.await( timeout, unit );
     
boolean timeExpired = countDownLatch.getCount() > 0;
     
if ( timeExpired )
     
{
         
String message = this.getClass().getSimpleName() + ": Timeout exception. Result not present.";
         
TimeoutException timeoutException = new TimeoutException( message );
         
this.onError( timeoutException );
         
return null;
     
}
     
return asyncResult;
   
}

   
/**
    * @return state of this observer
    */
   @Override
   public SwingWorker.StateValue getState()
   
{
     
return this.currentState;
   
}

   
/**
    *
    * @return true if {@link this#done(Object)} has successfully been
    *         executed. False otherwise.
    */
   @Override
   public boolean isDone()
   
{
     
return this.done.get();
   
}

   
/**
    *
    * @return true if this observer has been cancel. False otherwise
    */
   @Override
   public boolean isCancelled()
   
{
     
return this.cancelled.get();
   
}

   
/**
    * unsubscribes this observer if it is subscribed and {@param mayInterruptIfRunning}
    * is true. The method returns false if this observer has already been cancelled, or
    * if the tasks has already completed normally.
    *
    * @param mayInterruptIfRunning
    *            to indicate whether a running observer should be
    *            unsubscribed or not
    * @return true if this observer was successfully unsubscribed, false otherwise
    */
   @Override
   public boolean cancel( boolean mayInterruptIfRunning )
   
{
     
if ( cancelled.get() || currentState.equals( SwingWorker.StateValue.DONE ) )
     
{
         
return false;
     
}
     
else if ( !this.isUnsubscribed() && mayInterruptIfRunning )
     
{
         
this.unsubscribe();
         
this.cancelled.set( true );
         
return true;
     
}
     
else
      {
         
return false;
     
}
   
}

   
/**
    *
    * @return returns the current progress of the task. This progress
    *         must be set by {@link this#setProgress(int)}
    */
   @Override
   public int getProgress()
   
{
     
return this.progress.get();
   
}

   
/**
    * Adds a property change listener to this observer
    *
    * @param listener
    *            listener to be added
    */
   @Override
   public void addPropertyChangeListener( PropertyChangeListener listener )
   
{
     
synchronized ( this )
     
{
         
this.propertyChangeSupport.addPropertyChangeListener( listener );
     
}
   
}

   
/**
    * Removes a property change listener from this observer
    *
    * @param listener
    *            listener to be removed
    */
   @Override
   public void removePropertyChangeListener( PropertyChangeListener listener )
   
{
     
synchronized ( this )
     
{
         
this.propertyChangeSupport.removePropertyChangeListener( listener );
     
}
   
}

   
/**
    * Fires a proparty change
    *
    * @param propertyName
    *            name of the property
    * @param oldValue
    *            previous value of the property
    * @param newValue
    *            new value of the property
    */
   @Override
   public void firePropertyChange( String propertyName, Object oldValue, Object newValue )
   
{
     
synchronized ( this )
     
{
         
this.propertyChangeSupport.firePropertyChange( propertyName, oldValue, newValue );
     
}
   
}

   
/**
    *
    * @return the {@link PropertyChangeListener} of this observer
    */
   @Override
   public PropertyChangeSupport getPropertyChangeSupport()
   
{
     
return propertyChangeSupport;
   
}

   
// ### Protected Methods ###

   /**
    * This method is invoked after {@link SwingWorkerRxOnSubscribe#doInBackground()} has completed.
    * The method is invoked in {@link SwingScheduler#getInstance()}
    *
    * @param asyncResult
    *            result from {@link SwingWorkerRxOnSubscribe#doInBackground()}
    */
   protected abstract void done( ResultType asyncResult );

   
/**
    * This method is invokated everytime {@link SwingWorkerRxOnSubscribe#publish(Object[])} is invoked.
    *
    * @param chunks
    *            arguments passed to {@link SwingWorkerRxOnSubscribe#publish(Object[])}
    */
   protected abstract void process( List<ProcessType> chunks );

   
/**
    * This method updates the progress of the observer. It can only
    * take values between 0 and 100.
    *
    * @param progress
    *            progress to be set
    */
   protected void setProgress( int progress )
   
{
     
if ( progress < 0 || progress > 100 )
     
{
         
throw new IllegalArgumentException( "the value should be from 0 to 100" );
     
}
     
if ( this.progress.equals( progress ) )
     
{
         
return;
     
}
     
synchronized ( this )
     
{
         
int oldProgress = this.progress.get();
         
propertyChangeSupport.firePropertyChange( "progress", oldProgress, progress );
         
this.progress.set( progress );
     
}
   
}

   
// ### Private Methods ###

   private void initializeStates()
   
{
     
this.progress = new AtomicInteger( 0 );
     
this.cancelled = new AtomicBoolean( false );
     
this.done = new AtomicBoolean( false );
   
}

   
private boolean isSubscribed()
   
{
     
return subscription != null && !subscription.isUnsubscribed();
   
}

   
private void subscribeObservable( Scheduler scheduler )
   
{
     
this.subscription = this.observable
            .onBackpressureBuffer()
           
.observeOn( SwingScheduler.getInstance() )
           
.subscribeOn( scheduler )
           
.subscribe( this );
   
}

   
private void setState( SwingWorker.StateValue state )
   
{
     
synchronized ( this )
     
{
         
SwingWorker.StateValue oldState = this.currentState;
         
propertyChangeSupport.firePropertyChange( "state", oldState, state );
         
this.currentState = state;
     
}
   
}
}

Thread Safe Data Transfer Object used to send information from Observable to Observer.
public class SwingWorkerDto<ReturnType, ProcessType>
{
   
private static final int DEFAULT_PROGRESS = -1;
   
private ReturnType asyncResult;
   
private List<ProcessType> chunks;
   
private AtomicInteger progress;

   
SwingWorkerDto()
   
{
     
this.progress = new AtomicInteger( DEFAULT_PROGRESS );
     
this.chunks = new ArrayList<ProcessType>();
     
this.asyncResult = null;
   
}

   
/**
    * Updates the chunk
    *
    * @param chunks
    *            chunks of data to be added
    * @return
    */
   SwingWorkerDto<ReturnType, ProcessType> send( ProcessType... chunks )
   
{
     
synchronized ( this.chunks )
     
{
         
this.chunks.addAll( Arrays.asList( chunks ) );
     
}
     
return this;
   
}

   
/**
    * Sets a result for the asynchronous operation
    *
    * @param asyncResult
    *            result to be set
    * @return this object so that it can be used in {@link rx.Subscriber#onNext(Object)} }.
    *         See {@link SwingWorkerRxOnSubscribe#call(Subscriber)}, {@link SwingWorkerRxOnSubscribe#setProgress(int)}
    *         and {@link SwingWorkerRxOnSubscribe#publish(Object[])}
    */
   SwingWorkerDto<ReturnType, ProcessType> setResult( ReturnType asyncResult )
   
{
     
synchronized ( this )
     
{
         
this.asyncResult = asyncResult;
     
}
     
return this;
   
}

   
/**
    * Getter to retrieve the chunks in {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
    *
    * @return the chunks as list
    */
   List<ProcessType> getChunks()
   
{
     
synchronized ( this.chunks )
     
{
         
List<ProcessType> chunksCloned = new ArrayList<ProcessType>();
         chunksCloned
.addAll( chunks );
         
return chunksCloned;
     
}
   
}

   
/**
    * To remove chunks after they have been processed.
    * See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
    *
    * @param chunks
    */
   void removeChunks( List<ProcessType> chunks )
   
{
     
synchronized ( this.chunks )
     
{
         
this.chunks.removeAll( chunks );
     
}
   
}

   
/**
    * Retrieves the async result cached in this Dto.
    * See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
    *
    * @return
    */
   ReturnType getResult()
   
{
     
synchronized ( this )
     
{
         
return this.asyncResult;
     
}
   
}

   
/**
    * To send the progress from {@link SwingWorkerRxOnSubscribe} to
    * {@link SwingWorkerSubscriber}.
    * See {@link SwingWorkerRxOnSubscribe#setProgress(int)}
    *
    * @param progress
    * @return
    */
   SwingWorkerDto<ReturnType, ProcessType> setProgress( int progress )
   
{
     
this.progress.set( progress );
     
return this;
   
}

   
/**
    * To identify whether a progress value was sent and processed.
    * See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
    *
    * @return
    */
   boolean isProgressValueAvailable()
   
{
     
return progress.get() != DEFAULT_PROGRESS;
   
}

   
/**
    * To get the progress for processing and reset its value so it is
    * not processed multiple times.
    * See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
    *
    * @return
    */
   int getProgressAndReset()
   
{
     
int progress = this.progress.get();
     
this.progress.set( DEFAULT_PROGRESS ); // reset progress after each get
      return progress;
   
}
}






Dávid Karnok

unread,
Dec 15, 2016, 7:42:03 AM12/15/16
to Grebiel Jose Ifill Brito, RxJava
Your OnSubscribe is incorrect, there are backpressure accounting and handling problems with it. Also it is supposed to be single-use only but there is no indication of that. Do you have to stick to the SwingWorker API or can you rewrite the body to use Observable.fromEmitter?

Grebiel Jose Ifill Brito

unread,
Dec 15, 2016, 10:08:17 AM12/15/16
to RxJava, gre...@gmail.com
Thanks again for your answer. I don’t really have to stick to the SwingWorker API, I just thought that it might be easier to apply automated refactoring if the same API is used, because I would only need to replace a node in the AST and do some other minor changes. The rest of the “magic” would be in those three classes.

The goal of the refactoring tool is to replace all SwingWorker objects in a project and their method invocations by RxJava. I was thinking about managing all operations from the observer (SwingWorkerSubscriber), so that the refactoring would look like:

swingWorker.execute(); /* -> observer.execute(); */
swingWorker.get( timeout, unit ); /* -> observer.get( timeout, unit ); */
swingWorker.getState(); /* -> observer.getState(); */
swingWorker.cancel( mayInterruptIfRunning ); /* -> observer.cancel( mayInterruptIfRunning ); */
// etc... All of those methods are implemented in SwingWorkerSubscriber class from the last post

implemented a SwingWorkerEmitter that does the same as the previous SwingWorkerRxOnSubscribe. Is Observable.fromEmitter recommended to avoid backpressure and handling problems?
I also added flag to make sure that the observable only runs one time. However if the async opertatin has completed, then the observable can be used again. This is not how the SwingWorker works, but I though that it wouldn’t hurt.
Here is the implementation of the Emitter. I decided to implement Action1<Emitter<...>> so that I can still use the same name doInBackground when creating the observable.

public abstract class SwingWorkerEmitter<ReturnType, ProcessType> 
       implements Action1<Emitter<SwingWorkerDto<ReturnType, ProcessType>>>
{
   
private Emitter<? super SwingWorkerDto<ReturnType, ProcessType>> emitter;

   
private SwingWorkerDto<ReturnType, ProcessType> dto;

   
private AtomicBoolean running = new AtomicBoolean( false );

   @Override
   @Deprecated
   public void call( Emitter<SwingWorkerDto<ReturnType, ProcessType>> emitter )
   
{
     
if ( running.get() )
     
{
         
return;
     
}
     
running.set(true);

     
this.emitter = emitter;
     
try
      {

         
this.dto = new SwingWorkerDto<ReturnType, ProcessType>();

         
ReturnType asyncResult = doInBackground();
         
this.emitter.onNext( this.dto.setResult( asyncResult ) );
         
this.emitter.onCompleted();

     
}
     
catch ( Exception throwable )
     
{

         
this.emitter.onError( throwable );
     
}
     
finally
      {
         
running.set( false );

     
}
   
}

   protected abstract ReturnType doInBackground() throws Exception;


   
//... same methods as before
}

Usage of the emitter:
Observable<SwingWorkerDto<String, Integer>> observable = Observable.fromEmitter(new SwingWorkerEmitter<String, Integer>()
{
   
@Override
   protected String doInBackground() throws Exception
   
{
     
PrintUtils.printMessage( "Entering doInBackground() method" );
     
for ( int i = 0; i < amountOfWork * 2; i = i + 2 )
     
{
         publish
( i, i + 1 );
         
Thread.sleep( TIME_FOR_WORK_UNIT );
     
}
     
PrintUtils.printMessage( "doInBackground() finished successfully" );
     
return "Async Result";
   
}
}, Emitter.BackpressureMode.BUFFER);


Would not the usage of observable.onBackpressureBuffer() have solved the backpreassure problems? 

this.subscription = this.observable
            .onBackpressureBuffer() // <---
            .observeOn( SwingScheduler.getInstance() )
           
.subscribeOn( scheduler )
           
.subscribe( this );


Thanks again for your time!

Dávid Karnok

unread,
Dec 15, 2016, 10:26:03 AM12/15/16
to Grebiel Jose Ifill Brito, RxJava
Is Observable.fromEmitter recommended to avoid backpressure and handling problems?

Yes but there are certain aspects that may hit your particular case: https://github.com/ReactiveX/RxJava/issues/4735

> Would not the usage of observable.onBackpressureBuffer() have solved the backpreassure problems? 

It prevents overflow but may lead to out-of-memory conditions.


I'd really love to help you but that would require me to implement it for you which defeats half of the purpose of your thesis work.

Grebiel Jose Ifill Brito

unread,
Dec 15, 2016, 10:39:56 AM12/15/16
to RxJava, gre...@gmail.com

Thanks again for your time. To know that something is incorrect is also a very helpful. I’ll keep reading about Rx to understand its concepts better.


Thanks!

Reply all
Reply to author
Forward
0 new messages