Getting a timeout error when testing a stream with mapAsync stage

482 views
Skip to first unread message

Jakub Kahovec

unread,
Feb 16, 2018, 3:02:10 AM2/16/18
to Akka User List
Hi,


when using Akka Streams (2.5.9)  I've recently bumped into a problem when testing a stream with a mapAsync stage. It unexpectedly ends with an assertion failing on "timeout (3 seconds) during expectMsg while waiting for OnComplete"


To demonstrate it I've created a simple example. 


The code below works as expected.


val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i * 2))

val c
= TestSubscriber.manualProbe[Int]()
val p
= sourceUnderTest.to(Sink.fromSubscriber(c)).run()
val
sub = c.expectSubscription()

sub.request(2)
c
.expectNextN(2)   // List(2,4)
c
.expectComplete()   // akka.stream.testkit.TestSubscriber$ManualProbe@fc258b1


However, when I add another mapping function (bold code), the test fails with a timeout. When I call additional c.request(1) (commented code) it ends correctly. So it looks like the mapping function adds an additional item into the stream, which seems strange.

  

val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i * 2).map(identity))

val c
= TestSubscriber.manualProbe[Int]()
val p
= sourceUnderTest.to(Sink.fromSubscriber(c)).run()
val
sub = c.expectSubscription()

sub.request(2)
c
.expectNextN(2)   // List(2,4)
// c.request(1)
c
.expectComplete()   // ends with java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete


Can anyone explain this strange behaviour ?


Thanks

Jakub

Patrik Nordwall

unread,
Feb 16, 2018, 3:55:45 PM2/16/18
to akka...@googlegroups.com
It is allowed to ”delay” delivery of the complete signal when there is no demand. This behavior is undefined, i.e. some stages deliver it immediately, some only when demand is requested. This is as expected and in such test you have to request enough to be sure to get the completed signal.

/Patrik
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Jakub Kahovec

unread,
Feb 17, 2018, 11:57:17 AM2/17/18
to Akka User List
I see. Thank you Patrik for the explanation.

Jakub

Jakub Kahovec

unread,
Feb 19, 2018, 1:40:05 AM2/19/18
to Akka User List
I had another thought about this behaviour and two things came to my mind. First, it might be worth mentioning this somewhere in the documentation about testing as other users might also experience this and I think didn't see this mentioned anywhere in the docs. And second, is it safe to use expectComplete at all if the behaviour is undefined and any such change might break the tests unexpectedly ?

Jakub

On Friday, February 16, 2018 at 9:55:45 PM UTC+1, Patrik Nordwall wrote:

Patrik Nordwall

unread,
Feb 19, 2018, 6:13:06 AM2/19/18
to akka...@googlegroups.com
Doc improvement is welcome. If you add request(1) before the expectComplete it’s not undefined, then you are supposed to get it.

/Patrik

Patrik Nordwall

unread,
Feb 19, 2018, 6:18:50 AM2/19/18
to akka...@googlegroups.com
btw, probably easier to use .runWith(TestSink.probe) in that example, or .runWith(Sink.seq).futureValue

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Jakub Kahovec

unread,
Feb 19, 2018, 6:32:15 AM2/19/18
to akka...@googlegroups.com
Thank you. So I'll add the request(1) before the expectComplete(). I actually use the .runWih(TesSink.probe) in my test, I used the TestSubscriber.manualProbe just in the demonstration example to be able to print the intermediate results.

Jakub

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/KXFPDLE4jpI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages