Flatten and grouping streams yields 'exceeding it's subscription-timeout'

151 views
Skip to first unread message

Muki

unread,
Oct 4, 2015, 9:52:41 AM10/4/15
to Akka User List
Hi,

My usecase: I get a List of elements that contain ids to download stuff. I want to download everything and afterwards
group it together to the corresponding elements. So I tried doing this with a simple example:

  implicit val system = ActorSystem("test-streams")
  implicit val materializer = ActorMaterializer()
  implicit val context = system.dispatcher

  val source: Source[List[Int], Unit] = Source(List(1, 2, 3, 4, 5).map(n => List.fill(n)(n)))

  source
    // produces a flow of ints 
    .mapConcat(identity)
    // recreate initial grouping n => List.of(n)
    .groupBy(identity)
    // fold the grouped values into a list
    .map {
      case (key, list) => list.fold(List.empty[Int])(_ :+ _)
    }
    // flatten the multiple sources into the stream
    .flatten(FlattenStrategy.concat)
    // print them out
    .runForeach(println)
    // shutdown everything
    .onComplete { result =>
      // Failure(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2: Publisher (akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@45963726) you are trying to subscribe to has been shut-down because exceeding it's subscription-timeout.)
      println(result)
      system.shutdown()
      system.awaitTermination()
    }

which is not working as expected. I'm not sure where I'm messing things up. The order of operations concat -> groupBy ( and fold) -> flatten don't seem to work.

thanks,
Muki

Endre Varga

unread,
Oct 5, 2015, 5:15:41 AM10/5/15
to akka...@googlegroups.com
Hi Muki

The sequence groupBy->flatten is inherently dangerous as it is an operation that cannot be done in bounded memory for arbitrary inputs. You very probably deadlocked your stream.

To get a picture of the underlying problem, imagine the following:
 - you have a source of infinite pseudorandom numbers
 - you use groupBy to divide the stream to odd and even numbers
 - then you concatenate these streams

What the framework is instructed in the above example is to: "take an infinite stream of arbitrary numbers and transform it to a stream of all the even numbers of them first, then all the odd numbers, without dropping any number of course". (*)

That is obviously not possible to do in bounded memory. There is a similar pattern in the cookbook that shows how to safely handle this:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html#Implementing_reduce-by-key

In short, you have to add enough buffer space so that all of the groups can be started to be consumed, and properly fail if there are more groups then it was allowed/expected instead of deadlocking

-Endre.

(*) of course groupBy in that example will not guarantee that even numbers come first, it can be the odd ones just as well.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Viktor Klang

unread,
Oct 5, 2015, 5:24:46 AM10/5/15
to Akka User List
Would it be useful with an `advice` setting on the Materializer, warning for these potential issues?
--
Cheers,

Endre Varga

unread,
Oct 5, 2015, 5:41:07 AM10/5/15
to akka...@googlegroups.com
WDYM, autodetecting such patterns? It is unfortunately not a trivial matter. It is easier to *detect* deadlock situations though (given that all the participating stages are the Akka provided ones), although with some false positives. I guess if the false positive case can be whitelisted it is doable.

-Endre

Viktor Klang

unread,
Oct 5, 2015, 5:45:41 AM10/5/15
to Akka User List
Well, the materializer can see if there's both a groupBy and a flatten in the same graph and output a log warning if `advice` is enabled. In the same spirit as a exhaustiveness warning is emitted even though a specific piece of code will never trip the MatchError.

Endre Varga

unread,
Oct 5, 2015, 5:51:56 AM10/5/15
to akka...@googlegroups.com
We can try to add such checkers, but the problem is that this pattern is just one of many. There are other commonly found ones like bcast->zip where one edge is not strictly one-to-one (which can be hidden, like a Resume directive). The problem with those that they need graph pattern matching to be able to detect which is both costly and have high false positive rate still.

-Endre
Reply all
Reply to author
Forward
0 new messages