Problem creating a balancing router for a stream sink

132 views
Skip to first unread message

Nicolau Werneck

unread,
Jul 14, 2015, 2:44:32 PM7/14/15
to akka...@googlegroups.com
I am trying to write an ActorSubscriber to use as a sink to am akka stream. This actor has a router to the processing of the incoming data.

My code is working just fine right now, I have it up on GitHub (https://github.com/projetoeureka/akka-mapreduce/commit/94a7896ffe5ab5b66e72a04b8ccd6c2f0b8ef390)

The problem is I only managed to make it work when using a SmallestMailbox routing logic. I wanted to use a BalacingPool. But when I replaced the kind of router in my code I cannot even create the router actor. Instead I get this error below. Any thoughs?  ++nic

[ERROR] [07/14/2015 15:43:21.994] [Main-akka.actor.default-dispatcher-3] [akka://Main/user/$a/flow-1-3-actorSubscriberSink] received Supervise from unregistered child Actor[akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router#-332511953], this will not end well
[ERROR] [07/14/2015 15:43:21.997] [Main-akka.actor.default-dispatcher-5] [akka://Main/user/$a/flow-1-3-actorSubscriberSink] configuration problem while creating [akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at geekie.mapred.Mapred.<init>(Mapred.scala:27)
at geekie.mapred.Mapred$$anonfun$props$1.apply(Mapred.scala:58)
at geekie.mapred.Mapred$$anonfun$props$1.apply(Mapred.scala:58)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343)
at akka.actor.Props.newActor(Props.scala:252)
at akka.actor.ActorCell.newActor(ActorCell.scala:552)
at akka.actor.ActorCell.create(ActorCell.scala:578)
... 9 more
Caused by: com.typesafe.config.ConfigException$BadPath: path parameter: Invalid path 'BalancingPool-/$a/flow-1-3-actorSubscriberSink/mapred-router': Token not allowed in path expression: 'a' ('$' not followed by {, 'a' not allowed after '$') (you can double-quote this token if you really want it here)
at com.typesafe.config.impl.Parser.parsePathExpression(Parser.java:1095)
at com.typesafe.config.impl.Parser.parsePath(Parser.java:1135)
at com.typesafe.config.impl.Path.newPath(Path.java:224)
at com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:80)
at akka.dispatch.CachingConfig.hasPath(CachingConfig.scala:97)
at akka.dispatch.Dispatchers.hasDispatcher(Dispatchers.scala:89)
at akka.routing.BalancingPool.newRoutee(Balancing.scala:106)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at scala.collection.generic.GenTraversableFactory.fill(GenTraversableFactory.scala:90)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:41)
at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:105)
at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:82)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751)
... 19 more


Yusuke Yasuda

unread,
Sep 15, 2015, 5:14:36 AM9/15/15
to Akka User List
I got the same error. My situation is same as @Nicolau, using BalancingPool with ActorSubscriber.

[ERROR] [09/15/2015 17:57:24.624] [TcpStreamActorITest-akka.actor.default-dispatcher-3] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] received Supervise from unregistered child Actor[akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router#3824265], this will not end well

[ERROR] [09/15/2015 17:57:24.630] [TcpStreamActorITest-akka.actor.default-dispatcher-2] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]

akka.actor.ActorInitializationException: exception during creation

at akka.actor.ActorInitializationException$.apply(Actor.scala:166)

at akka.actor.ActorCell.create(ActorCell.scala:596)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)

at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)

at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: akka.ConfigurationException: configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]

at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)

at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)

at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)

at akka.actor.ActorCell.actorOf(ActorCell.scala:369)

at skarn.apns.TcpStreamActor.<init>(TcpStreamActor.scala:138)

at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)

at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)

at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:346)

at akka.actor.Props.newActor(Props.scala:255)

at akka.actor.ActorCell.newActor(ActorCell.scala:552)

at akka.actor.ActorCell.create(ActorCell.scala:578)

... 9 more

Caused by: com.typesafe.config.ConfigException$BadPath: path parameter: Invalid path 'BalancingPool-/$b/flow-1-1-actorSubscriberSink/tcp-client-router': Token not allowed in path expression: 'b' ('$' not followed by {, 'b' not allowed after '$') (you can double-quote this token if you really want it here)

at com.typesafe.config.impl.Parser.parsePathExpression(Parser.java:1095)

at com.typesafe.config.impl.Parser.parsePath(Parser.java:1135)

at com.typesafe.config.impl.Path.newPath(Path.java:224)

at com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:80)

at akka.dispatch.CachingConfig.hasPath(CachingConfig.scala:97)

at akka.dispatch.Dispatchers.hasDispatcher(Dispatchers.scala:89)

at akka.routing.BalancingPool.newRoutee(Balancing.scala:106)

at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)

at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)

at scala.collection.generic.GenTraversableFactory.fill(GenTraversableFactory.scala:90)

at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:116)

at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:41)

at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:105)

at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:82)

at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751)

... 19 more


You can find my code here.




2015年7月15日水曜日 3時44分32秒 UTC+9 Nicolau Werneck:

Patrik Nordwall

unread,
Sep 22, 2015, 5:31:12 AM9/22/15
to akka...@googlegroups.com
I think there have been some issues around balancing pool configuration that are fixed in 2.4. You can try with Akka version 2.4.0-RC3.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Reply all
Reply to author
Forward
0 new messages