I would like to have some advices from the akka experts. These month, I working on a use case in which I fetched data from Kafka. I have a set of devices (which will grown in time) pushing their data to Kafka and I need to fetch them. I know that there is a library (akka-reactive-kafka) making the job but because I'm newbie in Akka, I need and want to fully understand Akka before to use a library using Akka.
I'm trying to search a way to scale out my application because the number of devices in the application scope will grown. Below the current architecture of my app :
ActorSystem -> KafkaListener -> Data_Buffer -> Block_Buffer -> WriteToDb.
Each "->" describe a relation "parent -> child" and in each actor, there are caches. So for examples, in the Data_Buffer actpr, I have a Map[Device, List[Sample]] for the device samples and so on and so on .... I know that by default, Akka is scaling but last days, I'm interesting to routers, more particulary to the hashing router. I wrote a POC to illustrate the idea that I have with using the hashing router :
package com.bioserenity.test.hashingrouter
import akka.actor._
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.actor.Props
import akka.routing.ConsistentHashingPool
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.testkit.{ TestActors, TestKit, ImplicitSender }
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
object Cache {
def props(actor : ActorRef) : Props = Props(new Cache(actor))
}
class Cache(actor : ActorRef) extends Actor {
def hashMapping: ConsistentHashMapping = {
case Evict(key) => key
}
val cache: ActorRef =
context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).
props(Props[SubCache]), name = "cache")
def receive = {
case m : Entry =>
cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key)
case m : Get =>
cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key)
case m : Evict =>
cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key)
case l : List[String] =>
actor ! l
println("receive " + l.toString)
}
}
class SubCache extends Actor {
var l = scala.collection.mutable.ListBuffer[String]()
def receive = {
case Entry(key, value) =>
println(self.path + " receive new request " + key)
l += value
println(l)
()
case Get(key) =>
println("Ask data of " + key)
sender() ! l.toList
case Evict(key) => l.clear
}
}
final case class Evict(key: String)
final case class Get(key: String) extends ConsistentHashable {
override def consistentHashKey: Any = key
}
final case class Entry(key: String, value: String)
class TEST_HashingRouter
extends TestKit(ActorSystem("foo"))
with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def hashMapping: ConsistentHashMapping = {
case Evict(key) => key
}
val cache: ActorRef =
system.actorOf(Cache.props(testActor), name = "cache")
"cache" must {
"put values in the cache" in {
cache ! Entry("hello", "HELLO")
cache ! Entry("hello", "HELLO2")
cache ! Entry("hi", "HI")
cache ! Entry("hi", "HI3")
cache ! Get("hello")
expectMsg(List("HELLO", "HELLO2"))
cache ! Get("hi")
expectMsg(List("HI","HI3"))
cache ! Evict("hi")
cache ! Get("hi")
expectMsg(List())
cache ! Entry("hI", "bar")
expectNoMsg
}
}
}