doOnError, doOnCompleted not called

897 views
Skip to first unread message

Steven Marcus

unread,
Feb 12, 2015, 11:55:28 PM2/12/15
to rxj...@googlegroups.com

Hello,

I'm using RxScala 0.23.1 and my doOnError, doOnCompleted handlers are never called:

    ffmpeg.
      exec(preset).
      doOnError {t ⇒
      if (tmpPath.toFile.delete)
        logger.warn("deleting: {} -> {}", tmpPath)
      else
        logger.error("delete failed: {} -> {}", tmpPath, outPath)
    }.
      doOnCompleted{ () =>
        if (tmpPath.toFile.renameTo(outPath.toFile))
          logger.debug("rename: {} -> {}", tmpPath, outPath)
        else
          logger.error("rename failed: {} -> {}", tmpPath, outPath)
      }.
      toBlocking.
      single

Here's the ffmpeg.exec function:

  override def exec(params: Seq[String]): Observable[JsValue] =
    Observable[JsValue](subscriber ⇒
      FFMPEG(params).exec() match {
        case Success(info) if !subscriber.isUnsubscribed ⇒
          subscriber.onNext(info)
          subscriber.onCompleted()
        case Failure(e) if !subscriber.isUnsubscribed ⇒
          subscriber.onError(e)
        case _ ⇒
        // ignore unsubscribed...
      }).subscribeOn(IOScheduler())

As you can see, the code is trying to be a good Observable by not calling the observer if unsubscribed. I saw this pattern in the example code somewhere.
But the do() handlers should always be called -- which makes me believe that I don't understand how to make this work in the general case.

But in the base case when testing without unsubscribing before error/complete, the do handlers are still not called.

Any help greatly appreciated. TIA
Steven

Shixiong Zhu

unread,
Feb 13, 2015, 1:23:41 AM2/13/15
to Steven Marcus, rxj...@googlegroups.com
But the do() handlers should always be called

`doOnError` and `doOnCompleted` may not be called if it's unsubscribed.

Do you want to do some clean work? If so, you can put the work in Subscription. Such as,

    Observable[Int](subscriber => {
      subscriber.add {
        // delete your temp files
        println("clean up")
      }
      val data: Try[Int] = Success(1)
      data match {
        case Success(v) if !subscriber.isUnsubscribed ⇒
          subscriber.onNext(v)
          subscriber.onCompleted()
        case Failure(e) if !subscriber.isUnsubscribed ⇒
          subscriber.onError(e)
        case _ =>
        // ignore unsubscribed...
      }
    }).subscribeOn(IOScheduler()).doOnError { e =>
      println("doOnError")
    }.doOnCompleted {
      println("doOnCompleted")
    }.toBlocking.single




Best Regards,

Shixiong Zhu
Reply all
Reply to author
Forward
0 new messages