import java.lang.reflect.Field
import akka.actor.ActorSystem
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.testkit.{TestKit, TestProbe}
import scala.concurrent.duration.FiniteDuration
/**
* Really hacky stuff to workaround limitations in Akka TestKit.
*/
trait AkkaHacker {
this: TestKit =>
/**
* Creates a new [[TestPublisher.Probe]] that shares the execution bounds with the enclosing [[TestKit]].
*/
def pubProbe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): TestPublisher.Probe[T] = {
val probe = TestPublisher.probe[T](initialPendingRequests)
probe.innerProbe = new SharedTimeFactorProbe(system)
probe
}
/**
* Creates a new [[TestSubscriber.Probe]] that shares the execution bounds with the enclosing [[TestKit]].
*/
def subProbe[T]()(implicit system: ActorSystem): TestSubscriber.Probe[T] = {
val probe = TestSubscriber.probe[T]()
probe.innerProbe = new SharedTimeFactorProbe(system)
probe
}
/**
* Creates a [[TestProbe]] that shares the execution bounds with the enclosing [[TestKit]].
*
* @param system the actor system for the probe
*/
class SharedTimeFactorProbe(system: ActorSystem) extends TestProbe(system) {
override def remaining = AkkaHacker.this.remaining
override def remainingOr(duration: FiniteDuration) = AkkaHacker.this.remainingOr(duration)
override def remainingOrDefault: FiniteDuration = AkkaHacker.this.remainingOrDefault
override def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = AkkaHacker.this.within(min, max)(f)
override def within[T](max: FiniteDuration)(f: => T): T = AkkaHacker.this.within(max)(f)
}
trait InnerProbe {
protected def outerProbe: Any
protected def probeField: Field = {
// Need the declared class not the runtime class.
val outerProbeClass = this.getClass.getDeclaredMethod("outerProbe").getReturnType
val field = outerProbeClass.getDeclaredFields.find(_.getType == classOf[TestProbe]).get
field.setAccessible(true)
field
}
def innerProbe: TestProbe = probeField.get(outerProbe).asInstanceOf[TestProbe]
def innerProbe_=(probe: TestProbe) = probeField.set(outerProbe, probe)
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = innerProbe.within(min, max)(f)
def within[T](max: FiniteDuration)(f: => T): T = innerProbe.within(max)(f)
}
implicit class RichPubProbe[I](override val outerProbe: TestPublisher.ManualProbe[I]) extends InnerProbe
implicit class RichSubProbe[I](override val outerProbe: TestSubscriber.ManualProbe[I]) extends InnerProbe
}