Issue with Camel and Idempotent Consumer pattern

900 views
Skip to first unread message

Ian Hummel

unread,
Jul 31, 2013, 11:12:30 AM7/31/13
to akka...@googlegroups.com
Hey everyone,

I'm using akka-camel 2.2.0 and trying to create a route with a FileIdempotentRepository so that I don't process the same files again upon restart.  I'm using http://camel.apache.org/idempotent-consumer.html as inspiration, but am running into some issues.  Whenever my actor system starts up I get the following log line and everything just hangs

2013-07-31 10:48:28,124 WARN  akka.actor.OneForOneStrategy audsaw-akka.actor.default-dispatcher-3 - Actor [Actor[akka://audsaw/user/manager#-1790397411]] failed to activate

Here's the relevant bits from my Actor.  If I do NOT override onRouteDefinition, everything works fine.

class Manager(override val endpointUri: String, database: DatabaseConfig, numWorkers: Int, batchSize: Int) extends CamelConsumer with Stash with ActorLogging {
override def onRouteDefinition = (rd) => rd.idempotentConsumer(Builder.header("CamelFilePath"), FileIdempotentRepository.fileIdempotentRepository(new File("/tmp/status"))).end
...


I can't find any clue in the log files as to what could be going on, even with TRACE enabled.  Anyone have any ideas?  Logs are below.


Thanks!


Starting Akka...
Running Akka 2.2.0
Deploying file:/Users/ianhummel/Juno/audsaw/target/audsaw-dist/deploy/audsaw-0.5.jar
2013-07-31 11:00:32,154 INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2013-07-31 11:00:32,164 DEBUG akka.event.EventStream main - logger log1-Slf4jLogger started
2013-07-31 11:00:32,165 DEBUG a.a.LocalActorRefProvider$SystemGuardian audsaw-akka.actor.default-dispatcher-4 - now supervising Actor[akka://audsaw/system/UnhandledMessageForwarder#-908659979]
2013-07-31 11:00:32,165 DEBUG akka.event.EventStream main - subscribing Actor[akka://audsaw/system/UnhandledMessageForwarder#-908659979] to channel class akka.actor.UnhandledMessage
2013-07-31 11:00:32,165 DEBUG akka.event.EventStream main - Default Loggers started
2013-07-31 11:00:32,166 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$1 audsaw-akka.actor.default-dispatcher-2 - started (akka.event.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$1@60c9630a)
2013-07-31 11:00:32,166 DEBUG akka.event.EventStream main - unsubscribing StandardOutLogger from all channels
2013-07-31 11:00:32,167 DEBUG a.a.LocalActorRefProvider$SystemGuardian audsaw-akka.actor.default-dispatcher-3 - now supervising Actor[akka://audsaw/system/deadLetterListener#801105455]
2013-07-31 11:00:32,168 DEBUG akka.event.EventStream audsaw-akka.actor.default-dispatcher-2 - subscribing Actor[akka://audsaw/system/deadLetterListener#801105455] to channel class akka.actor.DeadLetter
2013-07-31 11:00:32,168 DEBUG akka.event.DeadLetterListener audsaw-akka.actor.default-dispatcher-2 - started (akka.event.DeadLetterListener@62803d5)
Starting up com.mediamath.audstats.Kernel
Successfully started Akka
2013-07-31 11:00:32,206 DEBUG a.a.LocalActorRefProvider$Guardian audsaw-akka.actor.default-dispatcher-4 - now supervising Actor[akka://audsaw/user/manager#390636964]
2013-07-31 11:00:32,210 DEBUG a.a.LocalActorRefProvider$Guardian audsaw-akka.actor.default-dispatcher-4 - now supervising Actor[akka://audsaw/user/camel-supervisor#1922673270]
2013-07-31 11:00:32,382 DEBUG o.a.c.i.c.AnnotationTypeConverterLoader  - Found 3 packages with 15 @Converter classes to load
2013-07-31 11:00:32,431 DEBUG o.a.c.i.c.AnnotationTypeConverterLoader  - Loading file META-INF/services/org/apache/camel/TypeConverter to retrieve list of packages, from url: jar:file:/Users/ianhummel/Juno/audsaw/target/audsaw-dist/lib/camel-core-2.10.3.jar!/META-INF/services/org/apache/camel/TypeConverter
2013-07-31 11:00:32,433 DEBUG o.a.c.i.c.AnnotationTypeConverterLoader  - Loading file META-INF/services/org/apache/camel/TypeConverter to retrieve list of packages, from url: jar:file:/Users/ianhummel/Juno/audsaw/target/audsaw-dist/lib/camel-core-2.11.1.jar!/META-INF/services/org/apache/camel/TypeConverter
2013-07-31 11:00:32,435 DEBUG o.a.c.i.c.AnnotationTypeConverterLoader  - No additional package names found in classpath for annotated type converters.
2013-07-31 11:00:32,435 INFO  o.a.c.i.c.DefaultTypeConverter  - Loaded 172 type converters
2013-07-31 11:00:32,436 INFO  o.a.camel.impl.DefaultCamelContext  - Apache Camel 2.10.3 (CamelContext: audsaw) is starting
2013-07-31 11:00:32,438 INFO  o.a.c.m.ManagementStrategyFactory  - JMX is disabled.
2013-07-31 11:00:32,449 DEBUG o.a.c.impl.SharedProducerServicePool  - Starting service pool: org.apache.camel.impl.SharedProducerServicePool@77a9f87c
2013-07-31 11:00:32,453 INFO  o.a.camel.impl.DefaultCamelContext  - Total 0 routes, of which 0 is started.
2013-07-31 11:00:32,454 INFO  o.a.camel.impl.DefaultCamelContext  - Apache Camel 2.10.3 (CamelContext: audsaw) started in 0.017 seconds
2013-07-31 11:00:32,459 DEBUG Camel(akka://audsaw) audsaw-akka.actor.default-dispatcher-3 - Started CamelContext[audsaw] for ActorSystem[audsaw]
2013-07-31 11:00:32,464 DEBUG akka.camel.internal.CamelSupervisor audsaw-akka.actor.default-dispatcher-2 - started (akka.camel.internal.CamelSupervisor@121321f5)
2013-07-31 11:00:32,464 DEBUG akka.camel.internal.CamelSupervisor audsaw-akka.actor.default-dispatcher-2 - now supervising Actor[akka://audsaw/user/camel-supervisor/activationTracker#-1040545580]
2013-07-31 11:00:32,465 DEBUG akka.camel.internal.CamelSupervisor audsaw-akka.actor.default-dispatcher-2 - now supervising Actor[akka://audsaw/user/camel-supervisor/registry#1671635316]
2013-07-31 11:00:32,465 DEBUG a.camel.internal.ProducerRegistrar audsaw-akka.actor.default-dispatcher-6 - started (akka.camel.internal.ProducerRegistrar@693e4a5a)
2013-07-31 11:00:32,465 DEBUG a.camel.internal.ActivationTracker audsaw-akka.actor.default-dispatcher-4 - started (akka.camel.internal.ActivationTracker@459d3b3a)
2013-07-31 11:00:32,467 DEBUG a.camel.internal.ConsumerRegistrar audsaw-akka.actor.default-dispatcher-6 - started (akka.camel.internal.ConsumerRegistrar@6e82254d)
2013-07-31 11:00:32,468 DEBUG akka.camel.internal.Registry audsaw-akka.actor.default-dispatcher-5 - started (akka.camel.internal.Registry@11dfc8a0)
2013-07-31 11:00:32,469 DEBUG akka.camel.internal.Registry audsaw-akka.actor.default-dispatcher-5 - now supervising Actor[akka://audsaw/user/camel-supervisor/registry/producerRegistrar#-1542223685]
2013-07-31 11:00:32,471 DEBUG akka.camel.internal.Registry audsaw-akka.actor.default-dispatcher-5 - now supervising Actor[akka://audsaw/user/camel-supervisor/registry/consumerRegistrar#-544715843]
2013-07-31 11:00:32,472 DEBUG com.mediamath.audstats.Manager audsaw-akka.actor.default-dispatcher-3 - started (com.mediamath.audstats.Manager@3f56e5ed)
2013-07-31 11:00:32,472 DEBUG com.mediamath.audstats.Manager audsaw-akka.actor.default-dispatcher-3 - now supervising Actor[akka://audsaw/user/manager/writer#-1236340735]
2013-07-31 11:00:32,475 DEBUG com.mediamath.audstats.Writer audsaw-akka.actor.default-dispatcher-6 - started (com.mediamath.audstats.Writer@591bd9c9)
2013-07-31 11:00:32,476 DEBUG com.mediamath.audstats.Manager audsaw-akka.actor.default-dispatcher-8 - now monitoring Actor[akka://audsaw/user/camel-supervisor#1922673270]
2013-07-31 11:00:32,477 DEBUG o.a.camel.impl.DefaultCamelContext  - Adding routes from builder: Routes: []
2013-07-31 11:00:32,525 DEBUG o.a.c.impl.DefaultComponentResolver  - Found component: file in registry: null
2013-07-31 11:00:32,527 DEBUG o.a.c.impl.DefaultComponentResolver  - Found component: file via type: org.apache.camel.component.file.FileComponent via: META-INF/services/org/apache/camel/component/file
2013-07-31 11:00:32,538 DEBUG o.apache.camel.impl.DefaultComponent  - Creating endpoint uri=[file:///Users/ianhummel/Juno/audsaw/target/audsaw-dist/data/inbox], path=[/Users/ianhummel/Juno/audsaw/target/audsaw-dist/data/inbox], parameters=[{}]
2013-07-31 11:00:32,554 DEBUG o.a.camel.impl.DefaultCamelContext  - file:///Users/ianhummel/Juno/audsaw/target/audsaw-dist/data/inbox converted to endpoint: Endpoint[file:///Users/ianhummel/Juno/audsaw/target/audsaw-dist/data/inbox] by component: org.apache.camel.component.file.FileComponent@55c79dfc
2013-07-31 11:00:32,601 DEBUG o.a.c.p.interceptor.DefaultChannel  - Initialize channel for target: 'ConvertBodyTo[java.io.InputStream]'
2013-07-31 11:00:32,644 WARN  akka.actor.OneForOneStrategy audsaw-akka.actor.default-dispatcher-7 - Actor [Actor[akka://audsaw/user/manager#390636964]] failed to activate
2013-07-31 11:00:32,651 DEBUG com.mediamath.audstats.Manager audsaw-akka.actor.default-dispatcher-8 - stopping
2013-07-31 11:00:32,652 DEBUG com.mediamath.audstats.Writer audsaw-akka.actor.default-dispatcher-7 - stopped
2013-07-31 11:00:32,654 DEBUG com.mediamath.audstats.Manager audsaw-akka.actor.default-dispatcher-8 - stopped
2013-07-31 11:00:32,656 DEBUG akka.camel.internal.CamelSupervisor audsaw-akka.actor.default-dispatcher-5 - received AutoReceiveMessage Envelope(Terminated(Actor[akka://audsaw/user/manager#390636964]),Actor[akka://audsaw/user/manager#390636964])
^C
Shutting down Akka...
Shutting down com.mediamath.audstats.Kernel
Successfully shut down Akka

Ian Hummel

unread,
Jul 31, 2013, 3:23:41 PM7/31/13
to akka...@googlegroups.com
Hi again,

I put a simple example project online which demonstrates the issue.  It's @ https://github.com/themodernlife/akka-camel-example .

Cheers,

Akka Team

unread,
Aug 5, 2013, 12:25:09 PM8/5/13
to akka...@googlegroups.com
Hi Ian,

no idea, maybe a directory permissions problem at /tmp/status? Do we have any camel experts here?

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
 
 


--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Ian Hummel

unread,
Aug 5, 2013, 2:23:36 PM8/5/13
to akka...@googlegroups.com, akka...@googlegroups.com
Hi Roland,

I checked permissions, etc. I believe the main concern is that the framework appears to be swallowing the exception that caused activation to fail, making this near impossible to debug. 

I poked around the source a bit, but haven't had time to investigate further. I think there could be something happening in CamelSupervisor. Will update the thread if I find anything of note. 

Cheers,

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/X9XCgLaxZ_8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Akka Team

unread,
Aug 8, 2013, 2:09:59 AM8/8/13
to akka...@googlegroups.com
Hi Ian,

unfortunately I am a one man army this week so I cannot promise that I’ll find the time to dig into this issue. Could you make sure to file a ticket with your findings?

Thanks,

Roland
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.

To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/X9XCgLaxZ_8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Ian Hummel

unread,
Aug 9, 2013, 3:40:31 PM8/9/13
to akka...@googlegroups.com
Hi Roland,


I found the place in the code where exceptions would be thrown/caught, but because of the way supervisor strategies interact with logging, the complete stack trace is not logged if there are any exceptions thrown during route creation or customization.  

This is a shame, because you might for instance fat-finger a parameter in the endpointUri and not get any insight into why your actor's not starting unless you register for an activation future something like

val a = system.actorOf(Props[ExampleActor])
CamelExtension(system).activationFutureFor(a) onComplete {
case Success(actor) => println("Actor started successfully")
case Failure(e) => e.printStackTrace
}

I'm pretty new to Camel, but it seems that those endpoint URIs can get pretty complex so I would guess this to be a somewhat common occurrence.

The other thing I'll add just in case Google turns up this thread for someone in the future is that to get an idempotent consumer working all you need is

import org.apache.camel.builder.Builder
import org.apache.camel.processor.idempotent.FileIdempotentRepository

override def onRouteDefinition = {
(rd) => rd.idempotentConsumer(
Builder.header("CamelFilePath"), 
FileIdempotentRepository.fileIdempotentRepository(new File("status"))
)
}

If you call ".end" on the route definition (copying the example in the docs) you will get

[WARN] [08/09/2013 15:37:44.322] [default-akka.actor.default-dispatcher-4] [akka://default/user/camel-supervisor/registry/consumerRegistrar] Actor [Actor[akka://default/user/$a#-1125851945]] failed to activate
org.apache.camel.FailedToCreateRouteException: Failed to create route akka://default/user/$a at: >>> IdempotentConsumer[{header(CamelFilePath)} -> []] <<< in route: Route[[From[file://inbox]] -> [ConvertBodyTo[java.io.InputSt... because of Definition has no children on IdempotentConsumer[{header(CamelFilePath)} -> []]
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:873)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:171)
at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:722)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:1789)
at org.apache.camel.impl.DefaultCamelContext.addRouteDefinitions(DefaultCamelContext.java:666)
at org.apache.camel.builder.RouteBuilder.populateRoutes(RouteBuilder.java:337)
at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:264)
at org.apache.camel.impl.DefaultCamelContext.addRoutes(DefaultCamelContext.java:628)
at akka.camel.internal.ConsumerRegistrar$$anonfun$receive$4.applyOrElse(CamelSupervisor.scala:187)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Definition has no children on IdempotentConsumer[{header(CamelFilePath)} -> []]
at org.apache.camel.model.ProcessorDefinition.createChildProcessor(ProcessorDefinition.java:153)
at org.apache.camel.model.IdempotentConsumerDefinition.createProcessor(IdempotentConsumerDefinition.java:199)
at org.apache.camel.model.ProcessorDefinition.makeProcessor(ProcessorDefinition.java:461)
at org.apache.camel.model.ProcessorDefinition.addRoutes(ProcessorDefinition.java:179)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:870)
... 17 more

Haven't gotten my head around that one yet, but figured I'd mention it in case anyone has some advice!

Thanks,

- Ian.


On Wednesday, July 31, 2013 11:12:30 AM UTC-4, Ian Hummel wrote:

Björn Antonsson

unread,
Aug 13, 2013, 2:22:58 AM8/13/13
to akka...@googlegroups.com
Hi Ian,

Thanks for creating the ticket and sharing your findings.

Regarding the call to ".end", I think that it is only needed to close a nested builder like in "onException", and not the normal top level builder that you get from "idempotentConsumer".

B/

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

-- 
Björn Antonsson
Typesafe – Reactive Apps on the JVM
twitter: @bantonsson

Reply all
Reply to author
Forward
0 new messages