Akka Streams Stops Processing After Receiving the First Element

24 views
Skip to first unread message

Joe San

unread,
Jan 17, 2018, 5:50:48 AM1/17/18
to Akka User List

I have a project where I'm having a bunch of Actors trying to push some messages to Kafka. I have an application.conf where I register all my Kafka related configurations. For example., here is how I define my producers:

producer {
      //The number of unique Kafka producers producing to individual topics
      num-producers = "1"

      p1 {
        # Flag to disable a particular producer, by default it is false - meaning if this flag
        # is not set for a Producer, this corresponding producer will be disabled
        isEnabled = true
        bootstrap-servers = "localhost:9092"
        publish-topic = "DefaultMessageTopic"

        #MessageType to convert internal case class from to JSON
        message-type = "DefaultMessage"

        # Tuning parameter of how many sends that can run in parallel.
        parallelism = 100

        # How long to wait for `KafkaProducer.close`
        close-timeout = 60s

        # Fully qualified config path which holds the dispatcher configuration
        # to be used by the producers stages. Some blocking may occur.
        # When this value is empty, the dispatcher configured for the stream
        # will be used.
        use-dispatcher = "akka.kafka.default-dispatcher"

        request.required.acks = "1"
        //Below is necessary for having multiple consumers consume the same topic
        //num.partitions has to be geq num of consumers
        //auto.offset.reset = "smallest"
        num.partitions = "5"
      }
    }

As it can be seen from the configuration that based on the message-type, DefaultMessage in the case above, I have a case class representation:

trait Event
object Event {
  case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T]) extends Event
}

trait KafkaMessage
object KafkaMessage {

  case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
    def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
  }

  case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
    def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
  }
}

When I start my application, I read this application.conf, pass the producer related configuration to a SupervisorActor whose receive method looks like this:

  def fromString[M <: KafkaMessage](messageType: String)(implicit m: reflect.Manifest[M]): KafkaMessage = {
    messageType match {
      case "DefaultMessage" => classOf[DefaultMessage].newInstance()
      case "DefaultMessageBundle" => classOf[DefaultMessageBundle].newInstance()
    }
  }

override def preStart() = {
    super.preStart()

    // 1. We go over the KafkaProducerConfig and for each message type, we start a stream
    kafkaCfg.foreach {
      case (messageType, kafkaProducerProperties) =>
        self ! InitiateProducerStream(messageType, kafkaProducerProperties)
    }
  }

  // Utility to create Child actors
  def actorFor(props: Props, actorName: String) =
    context.actorOf(props, actorName)

  override def receive = /* super.receive.orElse */ {
    case InitiateProducerStream(messageType, producerProperties) =>
      KafkaProducerSupervisor.fromString(messageType) match {
        case _ : DefaultMessage =>
          val actorRef = actorFor(KafkaPublisher.props[DefaultMessage], s"${producerProperties("appName")}-$messageType")
          val stream = producerStream[DefaultMessage](producerProperties)
          actorRef ! ProducerStreamActivated[DefaultMessage](producerProperties("publish-topic"), stream)
        case _: DefaultMessageBundle =>
          val actorRef = actorFor(KafkaPublisher.props[DefaultMessageBundle], s"${producerProperties("appName")}-$messageType")
          val stream = producerStream[DefaultMessageBundle](producerProperties)
          actorRef ! ProducerStreamActivated[DefaultMessageBundle](producerProperties("publish-topic"), stream)
      }
  }

  def producerStream[T: Converter](producerProperties: Map[String, String]): SourceQueueWithComplete[T] = {
    if (Try(producerProperties("isEnabled").toBoolean).getOrElse(false)) {
      log.info(s"Kafka is enabled for topic ${producerProperties("publish-topic")}")
      val streamFlow = flowToKafka[T](producerProperties)
      val streamSink = sink(producerProperties)
      source[T].via(streamFlow).to(streamSink).run()
    } else {
      // We just Log to the Console and by pass all Kafka communication
      log.info(s"Kafka is disabled for topic ${producerProperties("publish-topic")}")
      source[T].via(flowToLog[T](log)).to(Sink.ignore).run()
    }
  }

As it can be seen that on startup, I go through all the Producer related configurations, for each of the producer configuration, I create a Akka stream flow, a child Actor instance that will eventually push the message to the stream and the stream will eventually push the message to Kafka. Here is how the streams and my child Actor looks like:

My ProducerStream.scala:

trait ProducerStream extends AkkaStreams {

  implicit val system: ActorSystem

  def source[T] = {
    Source.queue[T](Int.MaxValue, OverflowStrategy.backpressure) // TODO: Get the overflow strategy from properties
  }

  def sink(producerProperties: Map[String, String]) = {
    val brokers = producerProperties("bootstrap-servers")
    val producerSettings =
      ProducerSettings(
        system,
        new ByteArraySerializer,
        new StringSerializer
      ).withBootstrapServers(brokers)

    Producer.plainSink(producerSettings)
  }

  def flowToLog[T: Converter](log: LoggingAdapter) = {
    Flow[T].map {
      elem =>
        log.info(s":: START :: $elem")
        val stringJSONMessage = Try(Converter[DefaultMessage].convertToJson(elem.asInstanceOf[DefaultMessage]))

        stringJSONMessage match {
          case Success(suck) =>
            log.info(s"Success ******* $suck")
          case Failure(fuck) =>
            log.info(s"Failure ${fuck.getMessage}")
        }

        log.info(s"${Converter[T].convertToJson(elem)}")
        log.info(s":: END ::")
        elem
    }
  }

  def flowToKafka[T: Converter](producerProperties: Map[String, String]) = {
    val numberOfPartitions = producerProperties("num.partitions").toInt -1
    val topicToPublish = producerProperties("publish-topic")
    val rand = new scala.util.Random
    val range = 0 to numberOfPartitions

    Flow[T].map { msg =>
      val partition = range(rand.nextInt(range.length))
      val stringJSONMessage = Converter[T].convertToJson(msg)
      new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
    }
  }
}

Here is my child Actor (which is created by the SupervisorActor):

class KafkaPublisher[T <: KafkaMessage: ClassTag] extends Actor {

  implicit val system = context.system
  val log = Logging(system, this.getClass.getName)

  override final def receive = {
    case ProducerStreamActivated(_, stream) =>
      log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
      context.become(active(stream))

    case other =>
      log.warning("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[KafkaMessage]): Receive = {
    case msg: T =>
      stream.offer(msg)

    case other =>
      log.warning("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props[T <: KafkaMessage: ClassTag] =
    Props(new KafkaPublisher[T])
}

Now when I run a unit test that looks like this:

"KafkaProducerSupervisor" should {
    "stream DefaultMessage to the corresponding message stream via the corresponding Actor " in {

      // 0. Check if there is a ProducerConfig, if not fail the test as we cannot proceed further
      appCfg.kafkaConfig.producerCfg match {
        case Some(cfg) =>
          // 1. Create the ProducerSupervisorActor instance (rest is taken care by the Supervisor)
          system.actorOf(KafkaProducerSupervisor.props(cfg), s"${appCfg.appName}-supervisor")

          // We do this shit just so that the Actor has some time to Init itself
          within(4.seconds) {
            expectNoMsg()
          }

          // 2. Identify the child Actor for the DefaultMessage type
          actorFor("DefaultMessage").foreach {
            case None =>
              fail("SHIT SHIT SHIT !!!! BANG !!!!")
            case Some(actorRef) =>
              // 3. Send some messages that will be pushed to the corresponding stream
              actorRef ! DefaultMessage("Message 1", DateTime.now(DateTimeZone.UTC))
              actorRef ! DefaultMessage("Message 2", DateTime.now(DateTimeZone.UTC))
          }
        case _ => fail("SHIT SHIT SHIT !!!! BANG !!!")
      }
    }
  }

I see the following log message:

[DEBUG] [01/17/2018 10:14:33.199] [ScalaTest-run] [EventStream(akka://KafkaProducerSupervisorSpec)] logger log1-TestEventListener started
[DEBUG] [01/17/2018 10:14:33.201] [ScalaTest-run] [EventStream(akka://KafkaProducerSupervisorSpec)] Default Loggers started
[INFO] [01/17/2018 10:14:33.791] [KafkaProducerSupervisorSpec-akka.actor.default-dispatcher-5] [com.eon.pm.producers.KafkaProducerSupervisor(akka://KafkaProducerSupervisorSpec)] Kafka is disabled for topic DefaultMessageChannel
[INFO] [01/17/2018 10:14:33.910] [KafkaProducerSupervisorSpec-akka.actor.default-dispatcher-2] [com.eon.pm.producers.KafkaPublisher(akka://KafkaProducerSupervisorSpec)] Activated stream for Kafka Producer with ActorName >> haw-data-ingestion-DefaultMessage << ActorPath >> akka://KafkaProducerSupervisorSpec/user/haw-data-ingestion-supervisor/haw-data-ingestion-DefaultMessage <<
[INFO] [01/17/2018 10:14:37.597] [KafkaProducerSupervisorSpec-akka.actor.default-dispatcher-5] [com.eon.pm.producers.KafkaProducerSupervisor(akka://KafkaProducerSupervisorSpec)] :: START :: DefaultMessage(Message 1,2018-01-17T09:14:37.591Z)

As you can see that the flowToLog stream is not throwing any error nor doing any conversion of the DefaultMessage case class to its JSON representation. I don't understand why? Here are my Converters:

object JsonMessageConversion {
  implicit val resolveTimeout: Timeout = Timeout(3.seconds)

  case class FailedMessageConversion(kafkaTopic: String, msg: String, msgType: String)

  trait Converter[T] {
      def convertFromJson(msg: String): Either[FailedMessageConversion, T]
      def convertToJson(msg: T): String
  }

  //Here is where we create implicit objects for each Message Type you wish to convert to/from JSON
  object Converter {

    val dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
    implicit val jodaDateTimeReads: Reads[DateTime] = Reads[DateTime](js =>
      js.validate[String].map[DateTime](dt =>
        DateTime.parse(dt, DateTimeFormat.forPattern(dateFormat))
      )
    )

    implicit val jodaDateTimeWrites: Writes[DateTime] = new Writes[DateTime] {
      def writes(dt: DateTime): JsValue = JsString(dt.toString())
    }

    implicit object DefaultMessageConverter extends Converter[DefaultMessage] {

      implicit val defaultMessageReads: Reads[DefaultMessage] = (
        (__ \ "message").read[String] and
        (__ \ "timestamp").read[DateTime](jodaDateTimeReads)
      )(DefaultMessage.apply _)

      implicit val defaultMessageWrites: Writes[DefaultMessage] = (
        (__ \ "message").write[String] and
        (__ \ "timestamp").write[DateTime](jodaDateTimeWrites)
      )(unlift(DefaultMessage.unapply))

      implicit val defaultMessageFormat: Format[DefaultMessage] =
        Format(defaultMessageReads, defaultMessageWrites)

      override def convertFromJson(msg: String): Either[FailedMessageConversion, DefaultMessage] = {
        Json.parse(msg).validate[DefaultMessage] match {
          case s: JsSuccess[DefaultMessage] => Right(s.value)
          case _: JsError => Left(FailedMessageConversion("kafkaTopic", msg, "to: DefaultMessage"))
        }
      }

      override def convertToJson(msg: DefaultMessage): String = {
        Json.toJson(msg).toString()
      }
    }

    implicit object DefaultMessageBundleConverter extends Converter[DefaultMessageBundle] {

      implicit val defaultMessageReads = Converter.DefaultMessageConverter.defaultMessageReads
      implicit val defaultMessageWrites = Converter.DefaultMessageConverter.defaultMessageWrites

      implicit val defaultMessageBundleReads: Reads[DefaultMessageBundle] = (
        (__ \ "messages").read[Seq[DefaultMessage]] and
          (__ \ "timestamp").read[DateTime](jodaDateTimeReads)
        )(DefaultMessageBundle.apply _)

      implicit val defaultMessageBundleWrites: Writes[DefaultMessageBundle] = (
        (__ \ "messages").write[Seq[DefaultMessage]] and
          (__ \ "timestamp").write[DateTime](jodaDateTimeWrites)
        )(unlift(DefaultMessageBundle.unapply))

      implicit val defaultMessageFormat: Format[DefaultMessageBundle] =
        Format(defaultMessageBundleReads, defaultMessageBundleWrites)

      override def convertFromJson(msg: String): Either[FailedMessageConversion, DefaultMessageBundle] = {
        Json.parse(msg).validate[DefaultMessageBundle] match {
          case s: JsSuccess[DefaultMessageBundle] => Right(s.value)
          case _: JsError => Left(FailedMessageConversion("kafkaTopic", msg, "to: DefaultMessageBundle"))
        }
      }

      override def convertToJson(msg: DefaultMessageBundle): String = {
        Json.toJson(msg).toString()
      }
    }

    def apply[T: Converter] : Converter[T] = implicitly
  }
}

I'm not sure what exactly the problem is and why my flowToLog is not able to process any more messages. Any help is much appreciated!

Reply all
Reply to author
Forward
0 new messages