Issues using StreamTestKit

20 views
Skip to first unread message

Jason Steenstra-Pickens

unread,
Aug 30, 2016, 7:04:30 PM8/30/16
to Akka User List
Hi,

I'm trying to use the Probes in StreamTestKit but I have hit a few issues, specifically relating to the various probes.

The main one is that for all types of probes (Publisher/Subscriber ManualProbe/Probe) the actual TestProbe is encapsulated as a private member and most of the functions just delegate to this inner probe. The problem is that there is no way to set the end time which would normally be done via the within construct.

The second issue is the combination of the fact that TestPublisher.Probe#ensureSubscription does not return the underlying Subscription and this class does not override the expectSubscription to initialise the lazy val. The consequence of this is that you cannot get the Subscription and then use any of the other functions that internally use the lazy val otherwise it will expect two subscriptions.

Lastly, a relatively minor but frustrating issue is that ManualProbe#expectSubscription returns a PublisherProbeSubscription which has some useful functions for doing things on a per-subscription basis rather than for the whole Publisher. However StreamTestKit is private so even though PublisherProbeSubscription escapes out, you can't refer to its type so writing helper functions etc is not possible.

I was just hoping to raise these and get some feedback around whether anyone else had noticed these or perhaps I'm just using it wrong.

I really like Akka Streams and I'm hoping to get into a better position to start contributing back. Keep up the good work!


Cheers,
Jason

Dagny T

unread,
Aug 30, 2016, 7:43:57 PM8/30/16
to Akka User List
Hi Jason,

I'm a newbie pursuing Streams testing as well.  

I was wondering if you had run into; or found a way through a couple of the build.sbt dependency issues I was running into with the v2.9 Docs on Akka WebSocket support with Streams?  (Pls see my recent posts!)

If so; would you mind please sharing your insights by replying to my Posts with links to any information that would help with answering my questions on that?
I'd already found a build.sbt here; but it looks like the wrong one as it's too general and complex for the specific dependency set I need:

Thanks in advance for any help!
Dagny T

Jason Steenstra-Pickens

unread,
Aug 30, 2016, 10:49:19 PM8/30/16
to Akka User List
Here is my really hacky solution to the execution bounds problem:
import java.lang.reflect.Field

import akka.actor.ActorSystem
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.testkit.{TestKit, TestProbe}

import scala.concurrent.duration.FiniteDuration

/**
  * Really hacky stuff to workaround limitations in Akka TestKit.
  */
trait AkkaHacker {
  this: TestKit =>

  /**
    * Creates a new [[TestPublisher.Probe]] that shares the execution bounds with the enclosing [[TestKit]].
    */
  def pubProbe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): TestPublisher.Probe[T] = {
    val probe = TestPublisher.probe[T](initialPendingRequests)
    probe.innerProbe = new SharedTimeFactorProbe(system)
    probe
  }

  /**
    * Creates a new [[TestSubscriber.Probe]] that shares the execution bounds with the enclosing [[TestKit]].
    */
  def subProbe[T]()(implicit system: ActorSystem): TestSubscriber.Probe[T] = {
    val probe = TestSubscriber.probe[T]()
    probe.innerProbe = new SharedTimeFactorProbe(system)
    probe
  }

  /**
    * Creates a [[TestProbe]] that shares the execution bounds with the enclosing [[TestKit]].
    *
    * @param system the actor system for the probe
    */
  class SharedTimeFactorProbe(system: ActorSystem) extends TestProbe(system) {

    override def remaining = AkkaHacker.this.remaining

    override def remainingOr(duration: FiniteDuration) = AkkaHacker.this.remainingOr(duration)

    override def remainingOrDefault: FiniteDuration = AkkaHacker.this.remainingOrDefault

    override def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = AkkaHacker.this.within(min, max)(f)

    override def within[T](max: FiniteDuration)(f: => T): T = AkkaHacker.this.within(max)(f)
  }

  trait InnerProbe {

    protected def outerProbe: Any

    protected def probeField: Field = {
      // Need the declared class not the runtime class.
      val outerProbeClass = this.getClass.getDeclaredMethod("outerProbe").getReturnType
      val field = outerProbeClass.getDeclaredFields.find(_.getType == classOf[TestProbe]).get
      field.setAccessible(true)
      field
    }

    def innerProbe: TestProbe = probeField.get(outerProbe).asInstanceOf[TestProbe]

    def innerProbe_=(probe: TestProbe) = probeField.set(outerProbe, probe)

    def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = innerProbe.within(min, max)(f)

    def within[T](max: FiniteDuration)(f: => T): T = innerProbe.within(max)(f)
  }

  implicit class RichPubProbe[I](override val outerProbe: TestPublisher.ManualProbe[I]) extends InnerProbe

  implicit class RichSubProbe[I](override val outerProbe: TestSubscriber.ManualProbe[I]) extends InnerProbe

}

This is definitely going to come back and bite me but oh well at least it seems to work for now (e.g. this doesn't restore the end time back once we leave the within block but too bad...).

Now in my test I could do something along the lines of:
within(1.second) {
  sourceProbe.expectNoMsg()
  sinkProbe.expectNoMsg()
  expectNoMsg()
}

And they all share the timeout.
Reply all
Reply to author
Forward
0 new messages