Routing Kafka messages to websocket

瀏覽次數:522 次
跳到第一則未讀訊息

Vish Ramachandran

未讀,
2016年4月3日 凌晨2:50:222016/4/3
收件者:Akka User List
This question was not answered at https://stackoverflow.com/questions/36348020/kafka-message-to-websocket so I ask here.
Thanks
Vish
===============

I am trying to write a Kafka consumer to websocket flow using reactive-kafka, akka-http and akka-stream.

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
    } 
  }

From the kafka consumer (omitted here), I do a publisherActor ! commandString to dynamically add content to the websocket.

However, I run into this exception in the backend when I start multiple clients to the websocket:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
  at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
  at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
  ...

Can't one flow be used for all websocket clients? Or should the flow/publisher actor be created per client?

Here, I intend to send out "current"/ "live" notifications to all websocket clients. History of notifications is irrelevant and needs to be ignored for new clients.




Akka Team

未讀,
2016年4月6日 清晨7:40:432016/4/6
收件者:Akka User List
Hi Vish,

There currently is no stage in Akka Stream that will allow you to dynamically add and remove listeners, we have some ideas but nothing implemented or planned yet (see ticket https://github.com/akka/akka/issues/19478). So you would have to implement your own stage for this.

The ActorPublisher needs to be one actor instance per materialization , it cannot be shared between the streams, but with websockets you will materialize a new stream for each connection, so if using that you will have to push the pubsub to the other side of it, create it upon the incoming request and have it register itself somewhere where you will do the actual publishing.

You can see something like that (albeit two-ways) in this workshop I did with the Scala Usergroup here in Stockholm a while ago: https://github.com/johanandren/scala-stockholm-cluster-message-broker/tree/master/src/main/scala

--
Johan Andrén
Akka Team, Lightbend Inc.
回覆所有人
回覆作者
轉寄
0 則新訊息