Questions/Problems/Bugs Using Akka?

232 views
Skip to first unread message

Steve Ramage

unread,
May 21, 2014, 3:09:12 PM5/21/14
to akka...@googlegroups.com
Hello,

I'm new to Akka, and to provide some context to my question I will provide some background to the problem I'm actually trying to solve. I have only read a few chapters of the Akka manual, but have started implementing anyway since I need to get my hands dirty, and so no doubt do not a firm understanding of Akka principles and concepts. Essentially I have written a library for use in my narrow field of scientific research, and central to it is the following interface (everything is in Java 7, and this is slightly simplified):

interface TargetAlgorithmEvaluator
{
   /** 
   * Do the task descriptions, and when complete notify the onSuccess() or onFailure() method of the callback
   */
   public void evaluateTaskAsync(List<TaskDescription> t, Callback c);
   /**
   * Shutdown any resources associated with this TargetAlgorithmEvaluator
   */
   public void notifyShutdown();
}

A task in this case is roughly running a program and getting some results, and there are a bunch of ways you can do it. The tasks come from a very specific domain, and aren't general at all. The first and default way is locally, via the command line. So of course there is an implementation of this that just executes the tasks on the command line, and then gets the results. Sometimes we want to do 'lots' of these executions and so it makes sense to distribute them, and so another one exists that actually uses MySQL and has workers poll from the SQL database, and this works very well. Unfortunately for releasing our tools built with this library, the requirement that users have a tuned MySQL server around is limiting, so we would like some other distributed mechanism and hence Akka. Unlike what I imagine are standard Akka use cases, these distributions are incredibly transient, they just are scheduled on some shared cluster, work together for a while and are terminated. There is no stable or perpetual deployment. It's just a master / slave architecture.  The way this currently works in Akka is that the master job spins up, creates an actor system that is listening for other actors. When other actors on other systems come online, it will dispatch the tasks to them. The workers will then use the Command line implementation locally before giving the result back via Akka to the master.

One problem I'm currently having is implementing the notifyShutdown() method. This method needs to shutdown all the thread pools and the actor system etc. One thread in this listens to a specific inbox for completion results, and then dispatches calls to the callback in another thread. 

The code looks like the following:

Runnable run = new Runnable()
{
  @Override
  public void run() {
	
	while(!Thread.interrupted() && !stopProcessingInbox.get())
	{
  	  Object o = null;
			
	  try {
		String threadName = "My Thread " + Math.random(); 
	        System.out.println(threadName);
		Thread.currentThread().setName(threadName);	
		try {
		  o = inbox.receive(new FiniteDuration(1, TimeUnit.SECONDS));
		} finally
		{
 		  System.err.println("Done recieve");
		}
	  } catch(Throwable e)
	  {
		System.out.println("Error: " + e);
		throw e;
	  }

           if (o == null) continue;
          //Rest of while loop here
         }
                       
}
}
}

Essentially the problem is that I would like to make the inbox.receive sensitive to interruption, that might occur when notifyShutdown() is called. The above code however has some very weird output that I just don't understand:

My Thread 0.7085946542233289
Done recieve
Error: java.util.concurrent.TimeoutException: deadline passed
Exception in thread "My Thread 0.7085946542233289" java.lang.Error: java.util.concurrent.TimeoutException: deadline passed
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.concurrent.TimeoutException: deadline passed
	at akka.actor.dsl.Inbox$InboxActor$$anonfun$receive$1.applyOrElse(Inbox.scala:117)
	at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:184)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at akka.actor.dsl.Inbox$InboxActor.aroundReceive(Inbox.scala:62)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


So my questions or concerns are as follows:

1) Why does the stack trace which as far as I can tell is generated when the timeout expires NOT show my code at all. 
2) I can't seem to catch this exception at all, it looks like the code is throwing a checked exception j.u.c.TimeoutException, even though the method declaration for inbox.receive doesn't declare it.
3) What is the best way to implement a shutdown procedure, essentially I will have a thread blocking on an inbox and I would like to have that thread terminate _GRACEFULLY_.


Other things I should mention is that I'm running Akka 2.3.2, and I am unfamiliar with Maven so I literally just added every jar in the download to my class path. 

Thanks,
Steve Ramage




Martynas Mickevičius

unread,
May 23, 2014, 9:29:17 AM5/23/14
to akka...@googlegroups.com
Hello Steve,

it seems that you are using ActorDsl. It is meant to be used mostly for trying things out in the REPL. I would suggest re-factoring your application to separate actors and letting Akka do the concurrency management. In that case you should get rid of Runnable I see in your example and move it to actor.

Have you seen this article, which discuses some of the shutdown patterns?


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Steve Ramage

unread,
May 23, 2014, 1:47:21 PM5/23/14
to akka...@googlegroups.com
Thank you for your response.

AKKAs place in this application is to manage the concurrency involved in distributing tasks to workers, and processing them, etc... The implementation of the interface requires multi-threaded operation, and refactoring the interface introduces a huge burden on basically ever client of the library. It's also unlikely that they would be willing to adopt the actor model whole heartedly. Consequently I do have a general problem of needing to read inboxes from threads, although in this particular instance I could probably move it back into the Actor that is populating the inbox. I will look at the article you presented, but I guess the one question that is still open. Shouldn't inbox.receive() be declared to throw TimeoutException given that it is in fact throwing it.

Steve Ramage

Martynas Mickevičius

unread,
May 23, 2014, 2:12:42 PM5/23/14
to akka...@googlegroups.com
Scala does not have checked exceptions. That is why Java compiler cannot help you when calling a Scala method which throws a checked exception in Java.

Patrik Nordwall

unread,
May 23, 2014, 2:54:01 PM5/23/14
to akka...@googlegroups.com
Ah, that is interesting, I think we have missed an annotation here, please open a ticket.

/Patrik

Steve Ramage

unread,
May 23, 2014, 5:17:35 PM5/23/14
to akka...@googlegroups.com
Reply all
Reply to author
Forward
0 new messages