Odd behaviour of Broadcast hub

56 views
Skip to first unread message

Wiktor Tychulski

unread,
Oct 3, 2017, 12:08:47 PM10/3/17
to Akka User List
Simplified graph:
 | auth source | ----> zipping sources <- |Optional merge / concat / prepend| <-    | Broadcast | <-  |data source | 
    
This is simplified graph of real solution. In reality auth source is websocket incoming message with a token which is checked by service. If authentication is ok, then I would like to start sending through data from data source. if not I would like to send some error msg and cancel upstream (this is accomplished by takeWhile)
    
Broadcast data source may not be emitting any elements for some time.  I need to know what was the outcome of authentication straight away, so I would notify user about error. The only way to make sure it happens is to  prepend data source(broadcast data source) with one element - an initial msg.

I tried to accomplish it with various methods (merge/merge preferred/concat/prepend). And it always work if all clients are authorised and I'm starting to emit data from broadcasthub or all clients are not authorised and upstream is cancelled straight away, but if half of users are authorised and half are not then consumer streams are not properly finished (live but not consuming) and broadcast hub is buffering and after filling up internal buffer the whole stream is blocked.
   
 I created bunch of test showing the behaviour. Please note that for each method of adding that initial message I'm create 100 consumers. For each method there are 3 tests. One with all clients authorised to see data, one with all client not authorised and one with half on half.

Here is test code for this odd behaviour: 

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{
BroadcastHub,
Flow,
GraphDSL,
Keep,
MergePreferred,
Sink,
Source,
ZipWith
}
import akka.stream.testkit.scaladsl.TestSink
import akka.stream._
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.FlatSpec

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class TestFlow extends FlatSpec {

val cfg: Config = ConfigFactory.load()
implicit val sys: ActorSystem = ActorSystem("Test-Server", cfg)
implicit val dispatcher: ExecutionContext = sys.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(sys))

"Broadcast consumers" should "consume if there is only zip in consumer flow and all clients are authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ ⇒ true, simpleFlow(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is only zip in consumer flow and all clients are NOT authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, auth = _ ⇒ false, simpleFlow(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is only zip in consumer flow and all clients are half on half authenticated and NOT" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ % 2 == 0, simpleFlow(broadcast))

//this is works because there is no merging like components just simple zip.
probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is concat in consumer flow and all clients are authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ ⇒ true, flowWithConcat(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is concat in consumer flow and all clients are NOT authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, auth = _ ⇒ false, flowWithConcat(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is concat in consumer flow and are half on half clients are authenticated and NOT" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ % 2 == 0, flowWithConcat(broadcast))

//this is failing because broadcast hub buffer is set ot 8 and clinet streams are not finishing properly
probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is merge preferred in consumer flow and all clients are authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ ⇒ true, flowWithMergePreffered(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is merge preferred in consumer flow and all clients are NOT authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, auth = _ ⇒ false, flowWithMergePreffered(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is merge preferred in consumer flow and half on half clients are authenticated and NOT" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ % 2 == 0, flowWithMergePreffered(broadcast))

//this is failing because broadcast hub buffer is set ot 8 and clinet streams are not finishing properly
probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is prepend in consumer flow and all clients are authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ ⇒ true, flowWithPrepend(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is prepend in consumer flow and all clients are NOT authenticated" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, auth = _ ⇒ false, flowWithPrepend(broadcast))

probe.request(12).expectNextN(12)
probe.cancel()
}

it should "consume if there is prepend in consumer flow and half on half clients are authenticated and NOT" in {
val broadcast = createBroadcast()
val probe = broadcast
.toMat(TestSink.probe[MsgTick])(Keep.right)
.run()

addNConsumersClients(100, _ % 2 == 0, flowWithPrepend(broadcast))

//this is failing because broadcast hub buffer is set ot 8 and clinet streams are not finishing properly
probe.request(12).expectNextN(12)
probe.cancel()
}

private def createBroadcast() = {
val dataSource =
Source(List.range(0, 12))
.map(MsgTick)
.throttle(1, 100 millis, 1, ThrottleMode.Shaping)

dataSource
.toMat(BroadcastHub.sink[MsgTick](8))(Keep.right)
.run()
}

private def addNConsumersClients(
numberOfConsumers: Int,
auth: (Int) ⇒ Boolean,
mergingFlow: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed]) {
for {
i ← 0 until numberOfConsumers
} yield {
clientFlow(i, auth(i), mergingFlow)
}
}

private def clientFlow(
clientId: Int,
auth: Boolean,
mergingFlow: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed]) = {
Source
.single[Either[Exception, User]](
if (auth) Right(User("Admin")) else Left(new Exception("asdasd")))
.concat(Source.maybe[Either[Exception, User]])
.expand(Iterator.continually(_))
.via(mergingFlow)
.takeWhile(_.isRight, inclusive = true)
.log(s"sinking [$clientId]")
.to(Sink.ignore)
.withAttributes(
Attributes
.logLevels(
onFinish = Logging.InfoLevel,
onFailure = Logging.InfoLevel,
onElement = Logging.InfoLevel))
.run()
}

private def simpleFlow(source: Source[MsgTick, NotUsed])
: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed] =
Flow[Either[Exception, User]]
.zipWith(source)((authResult, dataSourceElement) ⇒ {
authResult.right.map(_ ⇒ dataSourceElement)
})

def flowWithConcat(source: Source[MsgTick, NotUsed])
: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed] = {
//concat
    val sourceWithInitElement = Source.single(MsgTick(-1)).concat(source)

Flow[Either[Exception, User]]
.zipWith(sourceWithInitElement)((authResult, dataSourceElement) ⇒ {
authResult.right.map(_ ⇒ dataSourceElement)
})
}

private def flowWithPrepend(source: Source[MsgTick, NotUsed])
: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed] = {
//prepending
    val sourceWithInitElement = source.prepend(Source.single(MsgTick(-1)))

Flow[Either[Exception, User]]
.zipWith(sourceWithInitElement)((authResult, dataSourceElement) ⇒ {
authResult.right.map(_ ⇒ dataSourceElement)
})
}

private def flowWithMergePreffered(source: Source[MsgTick, NotUsed])
: Flow[Either[Exception, User], Either[Exception, MsgTick], NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val zipWith = b.add(
ZipWith[Either[Exception, User], MsgTick, Either[Exception, MsgTick]](
(authResult, element) ⇒ authResult.right.map(_ ⇒ element)))

val merge = b.add(MergePreferred[MsgTick](1))

// format: off
// initial element
Source.single(MsgTick(-1)) ~> merge.preferred
source ~> merge
merge ~> zipWith.in1
// format: on

FlowShape(zipWith.in0, zipWith.out)
})
}

case class MsgTick(id: Int)
case class User(name: String)
}

Wiktor Tychulski

unread,
Oct 17, 2017, 8:34:23 AM10/17/17
to Akka User List
Hello all,
Can any one help with this? Am I doing something wrong here or this is as issue in akka library and I should report that? Any help will be really appreciated.
Reply all
Reply to author
Forward
0 new messages