Actor Modelling

101 views
Skip to first unread message

pvis

unread,
Sep 26, 2011, 4:30:39 PM9/26/11
to Akka User List
Hello Akka Users,

I am new to Akka Framework.

The problem that I am working on currently has these three high level
abstractions

DataSource

DomainObjectBuilder

DatabaseService

The DataSource is an Iterator and it can yield >40 million records.
Each result object retrieved from the Iterator when passed to the
Domain Builder gives us a Domain object.
I can only create one instance of each of these abstractions as
DomainObjectBuilder uses a thread pool, DatabaseService uses some
connection pool, getting an instance of filtered iterator takes a
large time. I have to use class instance and not objects directly to
represent domainbuilder and dbservice as I may have to process
multiple datasources in parallel at some point.

How do I model this problem using Actors so this datasource can be
processed easily. I want multiple worker actors to be running but they
need to be using the above abstractions. If I model each of the above
three abstractions as three different actors, and create multiple
instances of DomainOBjectBuilder and DatabaseService actors, they
might end up in creating multiple instances of builder and service
objects and result in out of memory exceptions. How do I maintain
single instance of DomainBuilder and DBService and yet have multiple
actors making use of these service in parallel.

Please advice.

Thanks
Prakash

Jonas Bonér

unread,
Sep 27, 2011, 2:31:56 AM9/27/11
to akka...@googlegroups.com
I guess you can create a facade object.

object DatabaseService {
  private val instance = actorOf[DatabaseServiceActor]

  def foo() = instance ! Foo

  def fooBar(): Option[Bar] = (instance ? Foo).as[Bar]
  ... // etc. 
}

Then use it: 

DatbaseService.foo()


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Jonas Bonér
CTO
Typesafe <http://www.typesafe.com/> - Enterprise-Grade Scala from the
Experts
Phone: +46 733 777 123
Twitter: @jboner <http://twitter.com/jboner>
Google+: http://gplus.to/jboner



Roland Kuhn

unread,
Sep 27, 2011, 5:02:19 AM9/27/11
to akka...@googlegroups.com
He might also have meant the reverse, i.e. wrapping a DomainObjectBuilder inside an actor and then have multiple other actors communicate with that:

class BuilderActor extends Actor {
private builder: DomainObjectBuilder = …
def receive = {
case SomeRequest(data) => self.reply(builder.doWhateverNecessary) // more work if this is asynchronous
}
}

class SomeWorker(val builder: ActorRef) extends Actor {
def receive = {
case whatever => (builder ? SomeRequest(whatever)) onResult (doFurtherProcessing)
}
}

but without knowing the use-case better, this is all guess-work.

Regards,

Roland

Roland Kuhn
Typesafe – Enterprise-Grade Scala from the Experts
twitter: @rolandkuhn


pvis

unread,
Sep 27, 2011, 10:21:52 AM9/27/11
to Akka User List
Thanks Jonas and Roland for trying to answer my question,

Roland, yes I meant wrapping the DomainObjectBuilder inside the actor
and making multiple actors use it, thank you.

builder.build(builder.doWhateverNecessary) is synchronous call, I have
used countdownlatch inside the builder to make it a synchronous call,
There is a limitation when one uses hawt-dispatch that one must not
use blocking code, I read the akka documentation and I found no such
limitation, please let me know if I have missed something.

Apologies for having too many questions, I am trying to understand and
use the actor concept properly, I will be passing only single instance
of BuilderActor and DBServiceActor to millions of worker actors,
correct? And this would increase the performance?

Builder.build(data) is only CPU bound, DBService uses BatchUpdate
JDBC, so a call to this service is not always IO Bound. Every 500th
call to the DBService will result in a write to DB, so I don’t know if
I should call this service as pure IO Bound.

When I am using plain Java threads, for CPU Bound
tasks(DomainBuilder), I must have number of threads equal to the
number of cores, whereas for IO Bound tasks, I must have more number
of threads lot more than the number of cores. Does this concept change
when we use actors?

I have added some more pseudocode based on the discussions above,

class BuilderActor extends Actor {
private builder: DomainObjectBuilder = …
def receive = {
case SomeRequest(data) => self.reply(builder.doWhateverNecessary)
}
}
class DBServiceActor extends Actor {
private dbService: DBService = ..
def receive = {
case Write(domainObject) => dbService.write(domainObject)
}
}
class SomeWorker(val builder: ActorRef, val service: ActorRef) extends
Actor {
def receive = {
case whatever => (builder ? SomeRequest(whatever)) onResult
(service ! _.value.get)
}
}
def main(args: Array[String]): Unit=
{
val builderActor = Actor.actorOf[BuilderActor].start()
val dbServiceActor = Actor.actorOf[DBServiceActor].start()
val datasource = ..

while(datasource.hasNext)
{
val worker = Actor.actorOf(new SomeWorker(builderActor,
dbServiceActor)).start()
worker ! whatever
}
}

When processing more than 40 millions of records, will the above code
result in Out of memory exceptions as the documentation says we can
create ~4.5 million actors with 4GB RAM? I will be running this
process on a Quadcore machine with 4GB RAM. How do I limit the
creation of more workers than what my machine could handle?

Please advice.

Thanks
Prakash

On Sep 27, 4:02 am, Roland Kuhn <goo...@rkuhn.info> wrote:
> He might also have meant the reverse, i.e. wrapping a DomainObjectBuilder inside an actor and then have multiple other actors communicate with that:
>
> class BuilderActor extends Actor {
>         private builder: DomainObjectBuilder = …
>         def receive = {
>                 case SomeRequest(data) => self.reply(builder.doWhateverNecessary) // more work if this is asynchronous
>         }
>
> }
>
> class SomeWorker(val builder: ActorRef) extends Actor {
>         def receive = {
>                 case whatever => (builder ? SomeRequest(whatever)) onResult (doFurtherProcessing)
>         }
>
> }
>
> but without knowing the use-case better, this is all guess-work.
>
> Regards,
>
> Roland
>
> On Sep 27, 2011, at 08:31 , Jonas Bonér wrote:
>
>
>
>
>
>
>
>
>
> > I guess you can create a facade object.
>
> > object DatabaseService {
> >   private val instance = actorOf[DatabaseServiceActor]
>
> >   def foo() = instance ! Foo
>
> >   def fooBar(): Option[Bar] = (instance ? Foo).as[Bar]
> >   ... // etc.
> > }
>
> > Then use it:
>
> > DatbaseService.foo()
>
> > For more options, visit this group athttp://groups.google.com/group/akka-user?hl=en.
>
> > --
> > Jonas Bonér
> > CTO
> > Typesafe <http://www.typesafe.com/> - Enterprise-Grade Scala from the
> > Experts
> > Phone: +46 733 777 123
> > Twitter: @jboner <http://twitter.com/jboner>
> > Google+:http://gplus.to/jboner
>
> > --
> > You received this message because you are subscribed to the Google Groups "Akka User List" group.
> > To post to this group, send email to akka...@googlegroups.com.
> > To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
> > For more options, visit this group athttp://groups.google.com/group/akka-user?hl=en.

Roland Kuhn

unread,
Sep 27, 2011, 5:01:46 PM9/27/11
to akka...@googlegroups.com
Hi Prakash,

your pseudo-code suggests that you just want to keep the Builder running at full speed while shoveling the results into the database. Why it should be necessary to start one actor per message? If you have problems keeping those in memory just send your “whatever” from the datasource to the builder, which forwards the result to the database. The work packages would be queued in the mailbox instead of as individual actors, which is much more efficient. Unless you can run the builder in parallel (i.e. using an actor pool in this case), I don’t see how anything could help you scale it up. Concerning the size of the pools for Builder and Service you’ll unfortunately have to experiment yourself (blocking in non-Hawt-dispatchers is allowed, but must of course be taken into account).

Regards,

Roland

> For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

pvis

unread,
Sep 28, 2011, 12:13:28 AM9/28/11
to Akka User List
Hi Roland,

I came up with this(addition is datasource as an actor), this won't
result in out of memory but not sure if this scalable

class DBServiceActor extends Actor {
private dbService: DBService = ..
def receive = {
case Write(domainObject) =>
dbService.write(domainObject)
}
}

class BuilderActor extends Actor {
private builder: DomainObjectBuilder = …
def receive = {
case GetDomainObject(data) =>
self.reply(Write(builder.doWhateverNecessary))
}
}

class DataSourceActor extends Actor {
private iterator: Iterator = ..
def receive = {
val data = iterator.next
case GetData => self.reply(GetDomainObject(data))
}
}

class SomeWorker(val builder: ActorRef, val service: ActorRef, val
agent: Agent[Int]) extends Actor {
override def preStart() = agent send (_ + 1)
def next() = self ! Run

def receive = {
case Run =>
val data = dataSource ? GetData
data onResult {
case GetDomainObject(data) =>
val domainFuture = builder ?
GetDomainObject(data)
domainFuture onResult {
case Write(domain) => service !
Write(domain)
next()
}
case Over => self ! PoisonPill
}
}

override def postStop() = agent send (_ + 1)
}
def main(args: Array[String]): Unit=
{
val builderActor = Actor.actorOf[BuilderActor].start()
val dbServiceActor = Actor.actorOf[DBServiceActor].start()
val datasourceActor = ..

val workers = Vector.fill(nrOfWorkers)(actorOf(new
Worker(domainBuilderActor, dbServiceActor, dataSourceActor,
agent)).start())
workers foreach(_ ! Run)

while (agent.get() != 0)
{
Thread.sleep(5000)
}

self ! PoisonPill
}

I have some follow up questions,
Is there a limit to number of messages an actor can take?
Introducing threadpool in DomainBuilder, Will this reintroduce
concurrency concerns which we tried to avoid with Actor model?

class BuilderActor extends Actor {
self.dispatcher = ...
private builder: DomainObjectBuilder = …
def receive = {
case GetDomainObject(data) =>
self.reply(Write(builder.buildDomain(data)))
}
}

Thanks for your time and advice.

Regards
Prakash

√iktor Ҡlang

unread,
Sep 28, 2011, 3:24:06 AM9/28/11
to akka...@googlegroups.com
Hi Prakash,


" val data = dataSource ? GetData
               data onResult {
                   case GetDomainObject(data) =>
                       val domainFuture = builder ?
GetDomainObject(data)
                       domainFuture onResult  {
                           case Write(domain) => service !
Write(domain)
                           next()
                       }"

This will break whenever the GetData times out, since there is no result then.

Cheers,
Viktor Klang

Akka Tech Lead
Typesafe - Enterprise-Grade Scala from the Experts

Twitter: @viktorklang

Roland Kuhn

unread,
Sep 28, 2011, 5:20:51 AM9/28/11
to akka...@googlegroups.com
Hi Prakash,

yes, apart from the missing error handling this is just what I meant; you might want to read up on Future.onException and Future.onTimeout. There is one thing missing, though: your SomeWorker will read from the data source and spawn Futures plus queued messages as fast as it can, which means that you might want to throttle it, e.g. our service might reply with a Permit once done and the worker gets a certain number of permits upon startup.

Regards,

Roland

> For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

pvis

unread,
Sep 28, 2011, 9:26:56 PM9/28/11
to Akka User List
Thanks Viktor and Roland for the heads up on Error Handling, Exception
and Timeout. It was very helpful.

It is an excellent idea, worker to wait for a Permit message from the
service before submitting additional task, thank you.
Pardon me I couldn't understood the next suggestion, Is there a reason
why we want worker to only get certain number of permits upon startup,
Is it like to free up memory or some performance reasons?

Regards
Prakash
> >>>>> actors making use of these service in parallel....
>
> read more »

Roland Kuhn

unread,
Oct 3, 2011, 11:45:29 AM10/3/11
to akka...@googlegroups.com
Hi Prakash,

limiting the permits achieves a throttling of the data source’s production rate so that it does not run too far ahead of the builder; e.g. if your data set to be processed is bigger than your available memory then you want to properly stream the data through your application instead of loading it all up front and then move it through the processing stages. Depending on your use case that might not be an issue, though, and I am not aware of any intrinsic performance penalties occurred by Akka because of large queues (apart from possible interactions with the garbage collector if it suddenly thinks that the queued messages are “long-lived”; never heard about that one, though).

Regards,

Roland

Reply all
Reply to author
Forward
0 new messages