Hashing Router vs Actor System with childs

59 views
Skip to first unread message

Kilic Ali-Firat

unread,
Nov 24, 2016, 8:41:15 AM11/24/16
to Akka User List
Hi everyone, 

I would like to have some advices from the akka experts. These month, I working on a use case in which I fetched data from Kafka. I have a set of devices (which will grown in time) pushing their data to Kafka and I need to fetch them. I know that there is a library (akka-reactive-kafka) making the job but because I'm newbie in Akka, I need and want to fully understand Akka before to use a library using Akka. 

I'm trying to search a way to scale out my application because the number of devices in the application scope will grown. Below the current architecture of my app :
ActorSystem -> KafkaListener -> Data_Buffer -> Block_Buffer -> WriteToDb. 

Each "->" describe a relation "parent -> child" and in each actor, there are caches. So for examples, in the Data_Buffer actpr, I have a Map[Device, List[Sample]] for the device samples and so on and so on .... I know that by default, Akka is scaling but last days, I'm interesting to routers, more particulary to the hashing router. I wrote a POC to illustrate the idea that I have with using the hashing router :

package com.bioserenity.test.hashingrouter

import akka.actor._
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.actor.Props
import akka.routing.ConsistentHashingPool
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope

import akka.testkit.{ TestActors, TestKit, ImplicitSender }
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll

object Cache {

  def props(actor : ActorRef) : Props = Props(new Cache(actor))
}
class Cache(actor : ActorRef) extends Actor {

  def hashMapping: ConsistentHashMapping = {
    case Evict(key) => key
  }

  val cache: ActorRef =
    context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).
      props(Props[SubCache]), name = "cache")

  def receive = {
    case m : Entry => 
      cache ! ConsistentHashableEnvelope(message = m,  hashKey = m.key)
    case m : Get => 
      cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key)
    case m : Evict => 
      cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key)
    case l : List[String] =>
      actor ! l
      println("receive " + l.toString)
  }
}


class SubCache extends Actor {

  var l = scala.collection.mutable.ListBuffer[String]()

  def receive = {
    case Entry(key, value) => 
      println(self.path + " receive new request " + key)
      l += value
      println(l)
      ()
    case Get(key)          => 
      println("Ask data of " + key)
      sender() ! l.toList
    case Evict(key)        => l.clear
  }
}


final case class Evict(key: String)

final case class Get(key: String) extends ConsistentHashable {
  override def consistentHashKey: Any = key
}

final case class Entry(key: String, value: String)




class TEST_HashingRouter
   
extends TestKit(ActorSystem("foo"))
   
with ImplicitSender
   
with WordSpecLike with Matchers with BeforeAndAfterAll {


 
def hashMapping: ConsistentHashMapping = {
   
case Evict(key) => key
 
}




  val cache
: ActorRef =
    system
.actorOf(Cache.props(testActor), name = "cache")




 
"cache" must {
   
"put values in the cache" in {
      cache
! Entry("hello", "HELLO")
      cache
! Entry("hello", "HELLO2")
      cache
! Entry("hi", "HI")


      cache
! Entry("hi", "HI3")


      cache
! Get("hello")
      expectMsg
(List("HELLO", "HELLO2"))


      cache
! Get("hi")
      expectMsg
(List("HI","HI3"))


      cache
! Evict("hi")
      cache
! Get("hi")
      expectMsg
(List())


      cache
! Entry("hI", "bar")
      expectNoMsg
   
}
 
}
}


I want to a use a pool of actor and I want to have one actor by device and manage the lifecycle of my device in the time. Because I can configure the number of actor in hashing router pool, I can create first a pool of 1000 actor (so devices) which manage automatically each devices data. For me and the current knoweledeges of Akka, the second architecture with the hashing router should scale if I have an important number of devices or the current architecture that I had is enough to scale out my application even for an important number of devices.

Cheers,
Alifirat Kilic.

Guido Medina

unread,
Nov 27, 2016, 4:52:44 PM11/27/16
to Akka User List
If you have one actor per device why do you need a pool? I believe you are trying to use many -and well known good- concepts together which might over-complicate things,
You could simply use one actor per device which will hold its own state and make it expire after some time being idle, such way you have an expiring cache with actors,
and one actor per device.

How to scale? let's say you have multiple JVMs, then the concept of hashing establishing some predictable order per node can make you distribute devices among JVMs using their hashcode:
So, Device_N => Node [Device_N.hashcode() % nodes count]

Assuming each JVM has a list of hosted devices if the cluster is modified (a node is added or removed), drop all device's actors and let them lazily load again,
because each device's actor expires you could say each device's actor behaves like a cache.

Modifying the cluster is the most complicated action as some coordination is required, because a device can be modified in between shutting down an actor in a node and bringing it on another,
but that's a different problem once you have committed to an architecture.

HTH,

Guido.

Kilic Ali-Firat

unread,
Dec 4, 2016, 4:10:32 AM12/4/16
to Akka User List
Hi,

The purpose to have pool is to have a cluster of N nodes and for each node, fixed a number of instances (so devices) to process. 

Let's say we have 10 devices and a cluster of 2 nodes, using an hashing router pool with a max number of instances to 5 by nodes ensures that I can treat all the streaming devices. It's allow to scale using Akka toolkit.

In your answer to my post, you are saying that instead to use a pool, I can also use remote actors if I have a cluster with several nodes ?

Guido Medina

unread,
Dec 4, 2016, 7:39:50 AM12/4/16
to Akka User List
If the purpose of the pool is to deliver messages to the actual device actor then you could simply use cluster-sharding extension,
it will deliver it to the actor remotely that is hosting such device, the device actor will be at its corresponding shard based on its hash.

Cluster-sharding can handle that for you, you only need to have a hashing function, if base on Integer IDs you can simply return such ID as hashcode,
and let cluster-sharding host the device at some node, you could also host devices at nodes that have a specific cluster role.

It can also handle pesistent states for you, read more about it and find out if that does everything you need:

HTH,

Guido.

Kilic Ali-Firat

unread,
Dec 5, 2016, 5:47:46 PM12/5/16
to Akka User List
Hi Guido,

Thank you very much for your answers and pointing to me on cluster-sharding which should be adapted in my use case !
Reply all
Reply to author
Forward
0 new messages