The issue to use reflection to get the field executorServiceDelegate of the default dispatcher

110 views
Skip to first unread message

bear2...@gmail.com

unread,
Nov 16, 2015, 2:02:44 PM11/16/15
to kamon-user
The error occurred when we upgraded the kamon-0.52 on line 50(https://github.com/kamon-io/Kamon/blob/master/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala)

The reason is we have our own Dispatcher which extends akka.dispatch.Dispatcher, the reflection call 'getDeclaredField' threw the error, because it could not get the inherited class fields.

So we could not upgrade, any possible fixes?

Thanks

Diego Parra

unread,
Nov 16, 2015, 2:33:15 PM11/16/15
to kamon...@googlegroups.com
Hi,

can you give us the code the custom Dispatcher or some code in order to make a test and try to fix this issue?

cheers,

Diego.


--
You received this message because you are subscribed to the Google Groups "kamon-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to kamon-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

bear2...@gmail.com

unread,
Nov 16, 2015, 4:41:33 PM11/16/15
to kamon-user
Thank you, Diego, here are steps to replicate:

1) Create a custom dispatcher(just copy the code)
-----------

package my.custom.actor.system

import java.util.concurrent.TimeUnit

import akka.dispatch._
import com.typesafe.config.Config
import play.api.Logger

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{Duration, FiniteDuration}

/**
* Configurator for a context propagating dispatcher.
*/
class ContextPropagatingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {

private val instance = new ContextPropagatingDisptacher(
this,
config.getString("id"),
config.getInt("throughput"),
FiniteDuration(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
configureExecutor(),
FiniteDuration(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))

override def dispatcher(): MessageDispatcher = instance
}

/**
* A context propagating dispatcher.
*
* This dispatcher propagates the current request context if it's set when it's executed.
*/
class ContextPropagatingDisptacher(_configurator: MessageDispatcherConfigurator,
id: String,
throughput: Int,
throughputDeadlineTime: Duration,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
shutdownTimeout: FiniteDuration)
extends Dispatcher(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout) {
self =>

override def prepare(): ExecutionContext = new ExecutionContext {
// capture the context
val context = RequestContext.capture()

def execute(r: Runnable) = self.execute(new Runnable {
// Run the runnable with the captured context
def run() = context.withContext(r.run())
})
def reportFailure(t: Throwable) = self.reportFailure(t)
}
}

/**
* The current request context.
*/
object RequestContext {

/**
* Capture the current request context.
*/
def capture(): CapturedRequestContext = new CapturedRequestContext {
val requestInfoOpt: Option[RequestInfo] = Some(RequestInfo(
Some("Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2224.3 Safari/537.36"),
None,
Some("127.0.0.1"),
Some("http://localhost/test")
))

def withContext[T](block: => T) = {
requestInfoOpt map { requestInfo =>
try {
block
} catch {
case e: Throwable =>
Logger.error("Uncaught exception | " + requestInfoOpt, e)
throw e
}
}
}
}
}

/**
* A captured request context
*/
trait CapturedRequestContext {

/**
* Execute the given block with the captured request context.
*/
def withContext[T](block: => T)
}


case class RequestInfo(
userAgentHeaderOpt: Option[String],
forwardedForHeaderOpt: Option[String],
clientIpOpt: Option[String],
requestUriOpt: Option[String]) {

val uuid = new com.eaio.uuid.UUID().toString

}

-----------
2) Add the following to the config file(application.conf)

----------
akka {
actor {
default-dispatcher = {
type = "my.custom.actor.system.ContextPropagatingDispatcherConfigurator"
}
}
}

---------------
3) I copied the your code and put it in the play application controller, it throws the error:

-----------
object Application extends Controller {

def index = Action { request =>

implicit lazy val system = ActorSystem("test-custom-dispatcher")

val defaultDispatcher = system.dispatcher
val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate")
executorServiceDelegateField.setAccessible(true)

Ok("it does work")
}
}

----------

Thanks

Hui

Message has been deleted

Diego Parra

unread,
Nov 25, 2015, 12:09:03 PM11/25/15
to kamon-user, bear2...@gmail.com
Hui,


But, I think that you are doing something that is posible with kamon using the TraceLocalStorage. here an example and in the next release we were to include in our documentation.

Cheers, 

Diego.

bear2...@gmail.com

unread,
Dec 7, 2015, 4:52:45 PM12/7/15
to kamon-user, bear2...@gmail.com
Thank you, Diego, Sorry for the late reply, I was out for couple of weeks, I will take a look your suggestions.

anantha...@gmail.com

unread,
Mar 2, 2016, 1:48:23 AM3/2/16
to kamon-user
I am getting same issue when i am trying to use 0.5.1/0.5.2
any quick fix?

Diego Parra

unread,
Mar 2, 2016, 8:40:54 AM3/2/16
to kamon-user, anantha...@gmail.com
plase try wit this snapshot version: 0.6.0-2961cc2fff3f408dbce964ed8ec82998bc864186

Cheers,
Diego.

anantha murthy

unread,
Mar 2, 2016, 8:44:07 AM3/2/16
to Diego Parra, kamon-user
sure thnks will update

anantha murthy

unread,
Mar 2, 2016, 8:51:48 AM3/2/16
to Diego Parra, kamon-user
Error:Error while importing SBT project:
...
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:235)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[error] sbt.ResolveException: unresolved dependency: io.kamon#kamon-akka_2.11;0.6.0-2961cc2fff3f408dbce964ed8ec82998bc864186: not found
[error] Use 'last' for the full log.

See complete log in /Users/ahandral/Library/Logs/IntelliJIdea14/sbt.last.log

sbt is not able to resolve the dependency for snapshot version

Diego Parra

unread,
Mar 2, 2016, 8:57:39 AM3/2/16
to kamon-user, anantha...@gmail.com
you need configure the snapshot version repository: resolvers += "Kamon Repository Snapshots" at "http://snapshots.kamon.io"

Cheers.

anantha murthy

unread,
Mar 2, 2016, 10:38:54 AM3/2/16
to Diego Parra, kamon-user
Hi 
 i started getting exceptions with snapshot version

 Exception in thread "main" java.lang.NoSuchMethodError: kamon.Kamon$.metrics()Lkamon/metric/MetricsModule;
[error]         at kamon.statsd.StatsDExtension.<init>(StatsD.scala:43)
[error]         at kamon.statsd.StatsD$.createExtension(StatsD.scala:32)
[error]         at kamon.statsd.StatsD$.createExtension(StatsD.scala:30)
[error]         at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
[error]         at akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1.apply(ActorSystem.scala:742)
[error]         at ak

akka.actor.ActorSystemImpl - While trying to load extension [kamon.akka.Akka], skipping...
[info] java.lang.ClassNotFoundException: kamon.akka.Akka
[info]  at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_60]
[info]  at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_60]
[info]  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_60]
[info]  at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_60]
[info]  at java.lang.Class.forName0(Native Method) ~[na:1.8.0_60]
[info]  at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_60]
[info]  at akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(DynamicAccess.scala:67) ~[akka-actor_2.11-2.3.14.jar:na]


anantha murthy

unread,
Mar 2, 2016, 11:24:30 AM3/2/16
to Diego Parra, kamon-user
resolved thanks

anantha murthy

unread,
Mar 2, 2016, 11:35:48 AM3/2/16
to Diego Parra, kamon-user
but I keep getting in logs
 [ERROR] [03/02/2016 22:01:02.029] [kamon-akka.actor.default-dispatcher-8] [akka://kamon/user/kamon-system-metrics/sigar-metrics-recorder] This method has not been implemented on this platform
[info] org.hyperic.sigar.SigarNotImplementedException: This method has not been implemented on this platform
[info]  at org.hyperic.sigar.SigarNotImplementedException.<clinit>(SigarNotImplementedException.java:28)
[info]  at org.hyperic.sigar.ProcFd.gather(Native Method)
[info]  at org.hyperic.sigar.ProcFd.fetch(ProcFd.java:30)
[info]  at org.hyperic.sigar.Sigar.getProcFd(Sigar.java:531)
[info]  at kamon.system.sigar.ULimitMetrics.update(ULimitMetrics.scala:13)
[info]  at kamon.system.sigar.SigarMetricsUpdater.updateMetrics(SigarMetricsUpdater.scala:50)
[info]  at kamon.system.sigar.SigarMetricsUpdater$$anonfun$receive$1.applyOrElse(SigarMetricsUpdater.scala:45)
[info]  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)


Reply all
Reply to author
Forward
0 new messages