public abstract class SwingWorkerRxOnSubscribe<ReturnType, ProcessType>
implements Observable.OnSubscribe<SwingWorkerDto<ReturnType, ProcessType>>,
Producer
{
private Subscriber<? super SwingWorkerDto<ReturnType, ProcessType>> observer;
private SwingWorkerDto<ReturnType, ProcessType> dto;
private long requestCount;
/**
* Manages the workflow of a SwingWorker by setting up the data
* transfer object {@link SwingWorkerDto <ReturnType, ProcessType>}
* that is used for sending progress, chunks of data and finally the async result
* to the observer. This class should not be overridden by subclasses. Therefore
* is is marked as {@link Deprecated}
*
* @param observer
* observer
*/
@Override
@Deprecated
public void call( Subscriber<? super SwingWorkerDto<ReturnType, ProcessType>> observer )
{
this.observer = observer;
try
{
if ( !this.observer.isUnsubscribed() )
{
observer.setProducer( this );
this.dto = new SwingWorkerDto<ReturnType, ProcessType>();
this.observer.onStart();
ReturnType asyncResult = doInBackground();
this.observer.onNext( this.dto.setResult( asyncResult ) );
this.observer.onCompleted();
}
}
catch ( Exception throwable )
{
observer.onError( throwable );
}
}
/**
* The maximum number of items that are produced can be for example influenced by
* calling the method {@link Observable#take(int)} on the observable.
* {@link this#requestCount} is being used to store the number of items being requested
* so that only the methods {@link this#publish(Object[])} and {@link this#setProgress(int)}
* are ignore in case that n is smaller than the number total number of items.<br>
* {@link this#requestCount} is used to guarantee that the
* final {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)} is always called
*
* @param n
* maximum number of items
*/
@Override
public void request( long n )
{
requestCount = n;
}
/**
* To be implemented by subclasses. This method corresponds to
* the {@link SwingWorker#doInBackground()} method.
*
* @return the result of the asynchronous operation
* @throws Exception
* a general exception that could be thrown
*/
protected abstract ReturnType doInBackground() throws Exception;
/**
* Sends chunks of data to the observer ({@link SwingWorkerSubscriber}).
*
* @param chunks
* the data to be send
*/
protected void publish( ProcessType... chunks )
{
if ( requestCount > 1 )
{
requestCount--;
this.observer.onNext( this.dto.send( chunks ) );
}
}
/**
* Sends the progress to the observer ({@link SwingWorkerSubscriber}.
*
* @param progress
* progress to be sent
*/
protected void setProgress( int progress )
{
if ( requestCount > 1 )
{
requestCount--;
this.observer.onNext( this.dto.setProgress( progress ) );
}
}
public abstract class SwingWorkerSubscriber<ResultType, ProcessType>
extends Subscriber<SwingWorkerDto<ResultType, ProcessType>>
implements RxSwingWorkerAPI<ResultType>
{
private Observable<SwingWorkerDto<ResultType, ProcessType>> observable;
private Subscription subscription;
private ResultType asyncResult;
private PropertyChangeSupport propertyChangeSupport;
private AtomicInteger progress;
private AtomicBoolean cancelled;
private AtomicBoolean done;
private SwingWorker.StateValue currentState;
private CountDownLatch countDownLatch;
/**
* Constructor: each observer has a reference to the observable object
*
* @param observable
*/
public SwingWorkerSubscriber( Observable<SwingWorkerDto<ResultType, ProcessType>> observable )
{
this.observable = observable;
this.propertyChangeSupport = new PropertyChangeSupport( this );
this.currentState = SwingWorker.StateValue.PENDING;
initializeStates();
}
/**
* Initializes the state of the asynchronous task every time
* that this observer is subscribed.
* <ol>
* <li>progress = 0</li>
* <li>cancelled = false</li>
* <li>done = false</li>
* <li>state = {@link SwingWorker.StateValue#STARTED}</li>
* </ol>
* <p>
* The {@link CountDownLatch}
* is initially set to one. When {@link this#onCompleted()} is
* invoked the {@link CountDownLatch} is set to 0 to indicate
* that a result is present. This is important for {@link this#get()}
* and {@link this#get(long, TimeUnit)}
*/
@Override
public void onStart()
{
initializeStates();
this.countDownLatch = new CountDownLatch( 1 );
setState( SwingWorker.StateValue.STARTED );
}
/**
* Used to process that items after a
* {@link SwingWorker#publish(Object[])} has been invoked. This
* class is not supposed to be overridden by subclasses. Therefore
* it was set as {@link Deprecated}. Use {@link this#process(List)}
* to process the items sent after by
* {@link SwingWorkerRxOnSubscribe#publish(Object[])}
*
* @param dto
* Data transfer object for chunks and asyncResult
*/
@Override
@Deprecated
public void onNext( SwingWorkerDto<ResultType, ProcessType> dto )
{
asyncResult = dto.getResult();
if ( dto.isProgressValueAvailable() )
{
setProgress( dto.getProgressAndReset() );
}
List<ProcessType> processedChunks = dto.getChunks();
process( processedChunks );
dto.removeChunks( processedChunks );
}
/**
* Updates the {@link CountDownLatch} setting it to 0. Calls
* {@link this#done(Object)} and set the state to
* {@link SwingWorker.StateValue#DONE}. This class should be
* overridden by subclasses. Use {@link this#done(Object)} instead
*/
@Override
@Deprecated
public void onCompleted()
{
countDownLatch.countDown();
done( asyncResult );
setState( SwingWorker.StateValue.DONE );
}
/**
* Executes the asynchronous task in {@link Schedulers#computation()}
* as long as the {@link this#observable} is not subscribed. Otherwise the
* invocation to this method is ignored. The operation is
* observed in {@link SwingScheduler#getInstance()}
*/
@Override
public void execute()
{
if ( !isSubscribed() )
{
Scheduler scheduler = Schedulers.computation();
subscribeObservable( scheduler );
}
}
/**
* Executes the asynchronous task in {@link Schedulers#immediate()}
* (The current thread) as long as the {@link this#observable}
* is not subscribed. Otherwise the invocation to this method is ignored.
* The operation is observed in {@link SwingScheduler#getInstance()}
*/
@Override
public void run()
{
if ( !isSubscribed() )
{
Scheduler scheduler = Schedulers.immediate();
subscribeObservable( scheduler );
}
}
/**
* Waits from the result of {@link SwingWorkerRxOnSubscribe#doInBackground()}
*
* @return return value of {@link SwingWorkerRxOnSubscribe#doInBackground()}
* @throws InterruptedException
* if the thread is interrupted while waiting
*/
@Override
public ResultType get() throws InterruptedException
{
countDownLatch.await();
return asyncResult;
}
/**
* Waits from the result of {@link SwingWorkerRxOnSubscribe#doInBackground()}
* given a timeout and a unit
*
* @param timeout
* timeout
* @param unit
* time unit
* @return return value of {@link SwingWorkerRxOnSubscribe#doInBackground()}
* @throws InterruptedException
* if the thread is interrupted while waiting
*/
@Override
public ResultType get( long timeout, TimeUnit unit ) throws InterruptedException
{
countDownLatch.await( timeout, unit );
boolean timeExpired = countDownLatch.getCount() > 0;
if ( timeExpired )
{
String message = this.getClass().getSimpleName() + ": Timeout exception. Result not present.";
TimeoutException timeoutException = new TimeoutException( message );
this.onError( timeoutException );
return null;
}
return asyncResult;
}
/**
* @return state of this observer
*/
@Override
public SwingWorker.StateValue getState()
{
return this.currentState;
}
/**
*
* @return true if {@link this#done(Object)} has successfully been
* executed. False otherwise.
*/
@Override
public boolean isDone()
{
return this.done.get();
}
/**
*
* @return true if this observer has been cancel. False otherwise
*/
@Override
public boolean isCancelled()
{
return this.cancelled.get();
}
/**
* unsubscribes this observer if it is subscribed and {@param mayInterruptIfRunning}
* is true. The method returns false if this observer has already been cancelled, or
* if the tasks has already completed normally.
*
* @param mayInterruptIfRunning
* to indicate whether a running observer should be
* unsubscribed or not
* @return true if this observer was successfully unsubscribed, false otherwise
*/
@Override
public boolean cancel( boolean mayInterruptIfRunning )
{
if ( cancelled.get() || currentState.equals( SwingWorker.StateValue.DONE ) )
{
return false;
}
else if ( !this.isUnsubscribed() && mayInterruptIfRunning )
{
this.unsubscribe();
this.cancelled.set( true );
return true;
}
else
{
return false;
}
}
/**
*
* @return returns the current progress of the task. This progress
* must be set by {@link this#setProgress(int)}
*/
@Override
public int getProgress()
{
return this.progress.get();
}
/**
* Adds a property change listener to this observer
*
* @param listener
* listener to be added
*/
@Override
public void addPropertyChangeListener( PropertyChangeListener listener )
{
synchronized ( this )
{
this.propertyChangeSupport.addPropertyChangeListener( listener );
}
}
/**
* Removes a property change listener from this observer
*
* @param listener
* listener to be removed
*/
@Override
public void removePropertyChangeListener( PropertyChangeListener listener )
{
synchronized ( this )
{
this.propertyChangeSupport.removePropertyChangeListener( listener );
}
}
/**
* Fires a proparty change
*
* @param propertyName
* name of the property
* @param oldValue
* previous value of the property
* @param newValue
* new value of the property
*/
@Override
public void firePropertyChange( String propertyName, Object oldValue, Object newValue )
{
synchronized ( this )
{
this.propertyChangeSupport.firePropertyChange( propertyName, oldValue, newValue );
}
}
/**
*
* @return the {@link PropertyChangeListener} of this observer
*/
@Override
public PropertyChangeSupport getPropertyChangeSupport()
{
return propertyChangeSupport;
}
// ### Protected Methods ###
/**
* This method is invoked after {@link SwingWorkerRxOnSubscribe#doInBackground()} has completed.
* The method is invoked in {@link SwingScheduler#getInstance()}
*
* @param asyncResult
* result from {@link SwingWorkerRxOnSubscribe#doInBackground()}
*/
protected abstract void done( ResultType asyncResult );
/**
* This method is invokated everytime {@link SwingWorkerRxOnSubscribe#publish(Object[])} is invoked.
*
* @param chunks
* arguments passed to {@link SwingWorkerRxOnSubscribe#publish(Object[])}
*/
protected abstract void process( List<ProcessType> chunks );
/**
* This method updates the progress of the observer. It can only
* take values between 0 and 100.
*
* @param progress
* progress to be set
*/
protected void setProgress( int progress )
{
if ( progress < 0 || progress > 100 )
{
throw new IllegalArgumentException( "the value should be from 0 to 100" );
}
if ( this.progress.equals( progress ) )
{
return;
}
synchronized ( this )
{
int oldProgress = this.progress.get();
propertyChangeSupport.firePropertyChange( "progress", oldProgress, progress );
this.progress.set( progress );
}
}
// ### Private Methods ###
private void initializeStates()
{
this.progress = new AtomicInteger( 0 );
this.cancelled = new AtomicBoolean( false );
this.done = new AtomicBoolean( false );
}
private boolean isSubscribed()
{
return subscription != null && !subscription.isUnsubscribed();
}
private void subscribeObservable( Scheduler scheduler )
{
this.subscription = this.observable
.onBackpressureBuffer()
.observeOn( SwingScheduler.getInstance() )
.subscribeOn( scheduler )
.subscribe( this );
}
private void setState( SwingWorker.StateValue state )
{
synchronized ( this )
{
SwingWorker.StateValue oldState = this.currentState;
propertyChangeSupport.firePropertyChange( "state", oldState, state );
this.currentState = state;
}
}
}
public class SwingWorkerDto<ReturnType, ProcessType>
{
private static final int DEFAULT_PROGRESS = -1;
private ReturnType asyncResult;
private List<ProcessType> chunks;
private AtomicInteger progress;
SwingWorkerDto()
{
this.progress = new AtomicInteger( DEFAULT_PROGRESS );
this.chunks = new ArrayList<ProcessType>();
this.asyncResult = null;
}
/**
* Updates the chunk
*
* @param chunks
* chunks of data to be added
* @return
*/
SwingWorkerDto<ReturnType, ProcessType> send( ProcessType... chunks )
{
synchronized ( this.chunks )
{
this.chunks.addAll( Arrays.asList( chunks ) );
}
return this;
}
/**
* Sets a result for the asynchronous operation
*
* @param asyncResult
* result to be set
* @return this object so that it can be used in {@link rx.Subscriber#onNext(Object)} }.
* See {@link SwingWorkerRxOnSubscribe#call(Subscriber)}, {@link SwingWorkerRxOnSubscribe#setProgress(int)}
* and {@link SwingWorkerRxOnSubscribe#publish(Object[])}
*/
SwingWorkerDto<ReturnType, ProcessType> setResult( ReturnType asyncResult )
{
synchronized ( this )
{
this.asyncResult = asyncResult;
}
return this;
}
/**
* Getter to retrieve the chunks in {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
*
* @return the chunks as list
*/
List<ProcessType> getChunks()
{
synchronized ( this.chunks )
{
List<ProcessType> chunksCloned = new ArrayList<ProcessType>();
chunksCloned.addAll( chunks );
return chunksCloned;
}
}
/**
* To remove chunks after they have been processed.
* See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
*
* @param chunks
*/
void removeChunks( List<ProcessType> chunks )
{
synchronized ( this.chunks )
{
this.chunks.removeAll( chunks );
}
}
/**
* Retrieves the async result cached in this Dto.
* See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
*
* @return
*/
ReturnType getResult()
{
synchronized ( this )
{
return this.asyncResult;
}
}
/**
* To send the progress from {@link SwingWorkerRxOnSubscribe} to
* {@link SwingWorkerSubscriber}.
* See {@link SwingWorkerRxOnSubscribe#setProgress(int)}
*
* @param progress
* @return
*/
SwingWorkerDto<ReturnType, ProcessType> setProgress( int progress )
{
this.progress.set( progress );
return this;
}
/**
* To identify whether a progress value was sent and processed.
* See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
*
* @return
*/
boolean isProgressValueAvailable()
{
return progress.get() != DEFAULT_PROGRESS;
}
/**
* To get the progress for processing and reset its value so it is
* not processed multiple times.
* See {@link SwingWorkerSubscriber#onNext(SwingWorkerDto)}
*
* @return
*/
int getProgressAndReset()
{
int progress = this.progress.get();
this.progress.set( DEFAULT_PROGRESS ); // reset progress after each get
return progress;
}
}
swingWorker.execute(); /* -> observer.execute(); */swingWorker.get( timeout, unit ); /* -> observer.get( timeout, unit ); */swingWorker.getState(); /* -> observer.getState(); */swingWorker.cancel( mayInterruptIfRunning ); /* -> observer.cancel( mayInterruptIfRunning ); */
// etc... All of those methods are implemented in SwingWorkerSubscriber class from the last post
I implemented a SwingWorkerEmitter that does the same as the previous SwingWorkerRxOnSubscribe. Is Observable.fromEmitter recommended to avoid backpressure and handling problems?
public abstract class SwingWorkerEmitter<ReturnType, ProcessType>
implements Action1<Emitter<SwingWorkerDto<ReturnType, ProcessType>>>
{
private Emitter<? super SwingWorkerDto<ReturnType, ProcessType>> emitter;
private SwingWorkerDto<ReturnType, ProcessType> dto;
private AtomicBoolean running = new AtomicBoolean( false );
@Override
@Deprecated
public void call( Emitter<SwingWorkerDto<ReturnType, ProcessType>> emitter )
{
if ( running.get() )
{
return;
}
running.set(true);
this.emitter = emitter;
try
{
this.dto = new SwingWorkerDto<ReturnType, ProcessType>();
ReturnType asyncResult = doInBackground();
this.emitter.onNext( this.dto.setResult( asyncResult ) );
this.emitter.onCompleted();
}
catch ( Exception throwable )
{
this.emitter.onError( throwable );
}
finally
{
running.set( false );
}
}
protected abstract ReturnType doInBackground() throws Exception;
//... same methods as before
}
Observable<SwingWorkerDto<String, Integer>> observable = Observable.fromEmitter(new SwingWorkerEmitter<String, Integer>()
{
@Override
protected String doInBackground() throws Exception
{
PrintUtils.printMessage( "Entering doInBackground() method" );
for ( int i = 0; i < amountOfWork * 2; i = i + 2 )
{
publish( i, i + 1 );
Thread.sleep( TIME_FOR_WORK_UNIT );
}
PrintUtils.printMessage( "doInBackground() finished successfully" );
return "Async Result";
}
}, Emitter.BackpressureMode.BUFFER);
this.subscription = this.observable
.onBackpressureBuffer() // <---
.observeOn( SwingScheduler.getInstance() )
.subscribeOn( scheduler )
.subscribe( this );
Thanks again for your time. To know that something is incorrect is also a very helpful. I’ll keep reading about Rx to understand its concepts better.
Thanks!