typed actor on a single thread?

93 views
Skip to first unread message

Kostas kougios

unread,
Apr 8, 2014, 7:01:48 AM4/8/14
to akka...@googlegroups.com
Hi, I've created a typed actor but it seems it runs on multiple threads. How can I run it on 1 single thread so that I can use collections within the typed actor &avoid synchronization?

√iktor Ҡlang

unread,
Apr 8, 2014, 7:25:42 AM4/8/14
to Akka User List

Hi,
Actors never run on multiple threads -at the same time-, it can however run on different threads for each invocation.
There is no need to synchronize anything inside the TypedActor.

Cheers,
V

On Apr 8, 2014 1:02 PM, "Kostas kougios" <kostas....@googlemail.com> wrote:
Hi, I've created a typed actor but it seems it runs on multiple threads. How can I run it on 1 single thread so that I can use collections within the typed actor &avoid synchronization?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Martynas Mickevičius

unread,
Apr 8, 2014, 7:46:26 AM4/8/14
to akka...@googlegroups.com
Just for general reference, there is a blog post that talks about TypedActor usage: http://letitcrash.com/post/19074284309/when-to-use-typedactors


On Tuesday, April 8, 2014 1:25:42 PM UTC+2, √ wrote:

Hi,
Actors never run on multiple threads -at the same time-, it can however run on different threads for each invocation.
There is no need to synchronize anything inside the TypedActor.

Cheers,
V

On Apr 8, 2014 1:02 PM, "Kostas kougios" <kostas....@googlemail.com> wrote:
Hi, I've created a typed actor but it seems it runs on multiple threads. How can I run it on 1 single thread so that I can use collections within the typed actor &avoid synchronization?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

Kostas kougios

unread,
Apr 8, 2014, 8:23:56 AM4/8/14
to akka...@googlegroups.com
Thanks for the replies, so I suppose this typed actor will work just fine. Any execution of the 2 methods will occur at 1 thread at each time and state changes within the actor consists of safe publication. I run it and it looks good.

trait MapService[K, V]
{
    def update(k: K, v: V): Unit
    def apply(k:K):Future[V]
}

class MapServiceImpl[K, V] extends MapService[K, V]
{
    private val m = collection.mutable.Map.empty[K, V]

    override def apply(k: K) = Future.successful(m(k))

    override def update(k: K, v: V) = m(k) = v
}

√iktor Ҡlang

unread,
Apr 8, 2014, 8:26:34 AM4/8/14
to Akka User List
yep


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Konstantinos Kougios

unread,
Apr 8, 2014, 9:11:36 AM4/8/14
to akka...@googlegroups.com
I wrote a small program to verify it is all ok, sometimes it runs fully and succeeds but most times it get stuck cause it doesn't get all items that it "put" into the map.

So what it does is it adds 1 million items into this typed actor map. It then waits till the map's actual size is 1 million ( to make sure all messages were delivered to the map actor). It then gets all items from the map and compares if the key contains the correct value (this always works correctly). Finally it compares the number of gets/checks it did, but this is where it fails, many runs get less items than those were put in the map along with some deadletter logging:

package actors

import akka.actor.{TypedProps, ActorSystem, TypedActor}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic.AtomicInteger

/**
 * @author kkougios
 *         Date: 2014/04/08 10:36 AM
 */
object Main extends App
{
    println("Starting up")

    val system = ActorSystem("test")
    val typed = TypedActor(system)
    val service: MapService[Int, String] = typed.typedActorOf(TypedProps[MapServiceImpl[Int, String]])

    val Iterations = 1000000
    try {
        // populate the map using many threads
        (1 to Iterations).par.foreach {
            i =>
                service(i) = "x" + i
        }

        // wait till the map is fully populated
        while (service.size.get != Iterations) {
            println(service.size.get)
            Thread.sleep(1000)
        }


        // now get all items from the map, again using many threads
        val l = (1 to Iterations).par.map {
            i =>
                service(i).map {
                    s => (s, "x" + i)
                }
        }.toList

        // and verify key<->value pairs match
        println("looking for issues")
        val checked = new AtomicInteger
        l.foreach {
            f =>
                f.foreach {
                    case (actual, expected) =>
                        checked.incrementAndGet()
                        if (actual != expected) println("FOUND ISSUE")
                }
        }

        // also verify we got/checked "Iterations" number of items
        println("waiting for checked")
        while (checked.get != Iterations) {
            println(checked.get)
            Thread.sleep(1000)
        }
    } finally {
        typed.poisonPill(service)
        println("shutting down")
        system.shutdown()
    }
}


running this produces (usually) something like this:

Starting up
[INFO] [04/08/2014 14:04:16.919] [test-akka.actor.default-dispatcher-11] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.919] [test-akka.actor.default-dispatcher-12] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-12] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-12] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-12] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-12] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-10] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-10] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-10] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920] [test-akka.actor.default-dispatcher-10] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#1286986617] to Actor[akka://test/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
looking for issues
waiting for checked
970038
977033 <- this should be 1 million which would allow the program to exit
977033
977033
977033
977033
... goes on forever


Any ideas what might be wrong? Here is the typed actor code:

package actors

import scala.concurrent.Future

/**
 * @author kkougios
 *         Date: 2014/04/08 10:39 AM
 */

trait MapService[K, V]
{
    def update(k: K, v: V): Unit
    def apply(k:K):Future[V]

    def size:Option[Int]

}

class MapServiceImpl[K, V] extends MapService[K, V]
{
    private val m = collection.mutable.Map.empty[K, V]

    override def apply(k: K) = Future.successful(m(k))

    override def update(k: K, v: V) = m(k) = v

    override def size = Some(m.size)
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/cySCeR4mcRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Kostas kougios

unread,
Apr 8, 2014, 9:17:30 AM4/8/14
to akka...@googlegroups.com
attached the test program.
experiments.tar.gz

√iktor Ҡlang

unread,
Apr 8, 2014, 9:19:55 AM4/8/14
to Akka User List
I dont see anywhere where you verify that:

  val l = (1 to Iterations).par.map {
            i =>
                service(i).map {
                    s => (s, "x" + i)
                }
        }.toList

if (l.size != iterations) throw new Exception("Comparing apples to oranges")

Konstantinos Kougios

unread,
Apr 8, 2014, 9:24:07 AM4/8/14
to akka...@googlegroups.com
I used to have a println to verify the number of futures in the list, but also added the check, still getting the issue once say every 2 runs.

When it runs correctly, it runs without any deadletters. But when it doesn't run correctly, it logs those deadletters too and gets stuck at the final while loop.

I've attached the whole experiment in my previous email, it can be run with sbt run

Cheers

√iktor Ҡlang

unread,
Apr 8, 2014, 9:27:27 AM4/8/14
to Akka User List
If there are dead letters it means that the actor somehow terminates, turn on more detailed logging.

Konstantinos Kougios

unread,
Apr 8, 2014, 9:41:58 AM4/8/14
to akka...@googlegroups.com
I activated logging, not much is logged. It seems it gets the dead letter log messages when it gets the items from the typed actor map:

Starting up
[DEBUG] [04/08/2014 14:39:30.238] [main] [EventStream(akka://test)] logger log1-Logging$DefaultLogger started
[DEBUG] [04/08/2014 14:39:30.239] [main] [EventStream(akka://test)] Default Loggers started
populate the map

wait till the map is fully populated
now get all items from the map
[INFO] [04/08/2014 14:39:46.578] [test-akka.actor.default-dispatcher-19] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.586] [test-akka.actor.default-dispatcher-16] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-2] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-2] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-2] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-2] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-2] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-11] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-11] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:39:46.599] [test-akka.actor.default-dispatcher-11] [akka://test/deadLetters] Message [java.lang.String] from Actor[akka://test/user/$a#-128635841] to Actor[akka://test/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

and verify key<->value pairs match
also verify we got/checked 1000000 number of items
795729
925174
952431
952431
952431
952431

...

√iktor Ҡlang

unread,
Apr 8, 2014, 9:44:04 AM4/8/14
to Akka User List
Does the actor die?

Konstantinos Kougios

unread,
Apr 8, 2014, 9:57:37 AM4/8/14
to akka...@googlegroups.com
doesn't seem so. I do

        println(s"is actor alive? ${service.size.isDefined} with size ${service.size}")

within the final loop:


        while (checked.get != Iterations) {
            println(checked.get)
            println(s"is actor alive? ${service.size.isDefined} with size ${service.size}")
            Thread.sleep(1000)
        }

 and I get:

733210
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)
976138
is actor alive? true with size Some(1000000)

So it seems still alive (is there a better way to get a true/false if the actor is alive?)

So far I get 1 million Feature's, but when I iterate and increase the "checked" counter, for many runs it doesn't sum up to 1 million.

Cheers

√iktor Ҡlang

unread,
Apr 8, 2014, 9:59:36 AM4/8/14
to Akka User List
Can you create a minimized reproducible test case?

Konstantinos Kougios

unread,
Apr 8, 2014, 12:21:02 PM4/8/14
to akka...@googlegroups.com
I created one but seems the problem is due to some timeouts which occur 50% of the times I run it:

org.scalatest.exceptions.TestFailedException: List(Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://test/user/$a#-1771127837]] after [5000 ms]), ....

So probably the last of the service(i) wait for more than 5 secs and they timeout

√iktor Ҡlang

unread,
Apr 8, 2014, 12:23:52 PM4/8/14
to Akka User List
So if you try to bump the timeout it should be fine?

Konstantinos Kougios

unread,
Apr 8, 2014, 12:30:09 PM4/8/14
to akka...@googlegroups.com
yes, seems ok after 5 runs

attached the whole experiment, just for reference
experiments.tar.gz

√iktor Ҡlang

unread,
Apr 8, 2014, 12:33:29 PM4/8/14
to Akka User List
Alright!

So the moral of the story is: if you are sending a million messages from multiple other threads, the single thread that processes them won't get a lot of chance to do its work—hence timeouts :)

Konstantinos Kougios

unread,
Apr 8, 2014, 12:34:16 PM4/8/14
to akka...@googlegroups.com
I suppose so.

Where do you think is the overhead? The proxying of the trait or dispatching of messages?

√iktor Ҡlang

unread,
Apr 8, 2014, 12:39:10 PM4/8/14
to Akka User List
Guesses:

1) you run ".par" which will create number-of-cores new Threads, which now means that there are more threads than cores, i.e. competition for cpu time begins
2) overhead for dynamic proxying
3) overhead for creating Futures
4) overhead for scheduling timeouts
5) overhead for performing the work

Konstantinos Kougios

unread,
Apr 9, 2014, 5:17:57 AM4/9/14
to akka...@googlegroups.com
Did some profiling,

43% of the time is spend on scala.concurrent.forkjoin.ForkJoinPool.scan
6% on concurrent linked queue

and the rest of it in my code.

The ForkJoinPool.scan stays at 40+% if I remove the .par from my code, so this must be the akka thread pool. I suppose my code doesn't do much and hence most of the time is spend on the plumbing of the actors. A more useful stress test would be if the actor actually did something useful.

Anyway, thanks for the help

√iktor Ҡlang

unread,
Apr 9, 2014, 5:20:29 AM4/9/14
to Akka User List
Yes, designing benchmarks in concurrent systems is hard.

Happy hAkking
Reply all
Reply to author
Forward
0 new messages