The mapAsync and the mapAsyncUnordered an unexpected behavior

213 views
Skip to first unread message

Boris Lopukhov

unread,
Feb 27, 2015, 4:41:57 AM2/27/15
to akka...@googlegroups.com
I have a simple actor publisher:

class TestActor extends ActorPublisher[Int] {

  println
("RUN")

 
def receive = {
   
case Request(elements) => while (totalDemand > 0 && isActive) {
      onNext
(1)
   
}
   
case Cancel => println("CANCEL")
 
}
}

and i found a strange feature:

  Source[Int](Props[TestActor])
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }

printed RUN and CANCEL

  Source[Int](Props[TestActor])
   
.mapAsync(Future.successful)
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }

printed only RUN

  Source[Int](Props[TestActor])
   
.mapAsyncUnordered(Future.successful)
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }
 
throwed java.lang.OutOfMemoryError

i use "1.0-M3" version of akka-streams

Endre Varga

unread,
Feb 27, 2015, 4:54:35 AM2/27/15
to akka...@googlegroups.com
Hi Boris,

On Fri, Feb 27, 2015 at 10:41 AM, Boris Lopukhov <89b...@gmail.com> wrote:
I have a simple actor publisher:

class TestActor extends ActorPublisher[Int] {

  println
("RUN")

 
def receive = {
   
case Request(elements) => while (totalDemand > 0 && isActive) {
      onNext
(1)
   
}
   
case Cancel => println("CANCEL")
 
}
}

and i found a strange feature:

  Source[Int](Props[TestActor])
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }

printed RUN and CANCEL

This is itself fine, you get the error in the Future returned by runForeach.
 

  Source[Int](Props[TestActor])
   
.mapAsync(Future.successful)
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }

printed only RUN

This is not fine though.
 

  Source[Int](Props[TestActor])
   
.mapAsyncUnordered(Future.successful)
   
.map { x => throw new Exception(); x }
   
.runForeach { x => }
 
throwed java.lang.OutOfMemoryError

This is also not fine. I suspect there is a problem handling already completed Futures. You should file a ticket, this seems like a bug. If you could also try using a not already completed Future that would help.

-Endre
 

i use "1.0-M3" version of akka-streams

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Boris Lopukhov

unread,
Feb 27, 2015, 7:43:15 AM2/27/15
to akka...@googlegroups.com
Thanks for response!

I created issue https://github.com/akka/akka/issues/16959

пятница, 27 февраля 2015 г., 13:54:35 UTC+4 пользователь drewhk написал:
Reply all
Reply to author
Forward
0 new messages