I am trying to write a Kafka consumer to websocket flow using reactive-kafka, akka-http and akka-stream.
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws" / "commands" ) {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}
From the kafka consumer (omitted here), I do a publisherActor ! commandString
to dynamically add content to the websocket.
However, I run into this exception in the backend when I start multiple clients to the websocket:
[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...
Can't one flow be used for all websocket clients? Or should the flow/publisher actor be created per client?
Here, I intend to send out "current"/ "live" notifications to all websocket clients. History of notifications is irrelevant and needs to be ignored for new clients.