Akka Cluster Failing for Huge Data File

218 views
Skip to first unread message

Kunal Ghosh

unread,
May 5, 2017, 10:44:42 AM5/5/17
to Akka User List
Hi,
my application uses a Akka cluster which has one master node and two child seed nodes. The master node reads data from input file and sends it over to both child nodes for evaluation (processing).
The application works fine for smaller data file eg. file with 43 rows but when the input file is hug like with 2 million rows the application fails. The exception thrown with stack trace is given below.
I have also attached the configuration file and code examples are attached with this mail please do check them out and tell where I am wrong ????
Thanks in advance.




WARN [18:48:19.013]{iCEDQApp-akka.actor.default-dispatcher-22}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using the default Java serializer for class [org.iceengine.compare.akka.RowData] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
WARN [18:48:21.768]{iCEDQApp-akka.actor.default-dispatcher-28}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using the default Java serializer for class [org.iceengine.compare.akka.Result] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
WARN [18:48:21.813]{iCEDQApp-akka.actor.default-dispatcher-4}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Using the default Java serializer for class [org.iceengine.compare.akka.Result] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
WARN [18:48:23.002]{iCEDQApp-akka.actor.default-dispatcher-3}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster Node [akka.tcp://iCED...@192.168.100.199:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://iCED...@192.168.100.199:62915, status = Up)]. Node roles [backend]
WARN [18:48:23.058]{iCEDQApp-akka.actor.default-dispatcher-17}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Cluster Node [akka.tcp://iCED...@192.168.100.199:62915] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://iCED...@192.168.100.199:2551, status = Up)]. Node roles []
 Kunal_ICE ERROR[18:48:23.473]{iCEDQApp-akka.actor.default-dispatcher-24}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp:70)-AssociationError [akka.tcp://iCED...@192.168.100.199:2552] <- [akka.tcp://iCED...@192.168.100.199:62915]: Error [null] [
java.io.OptionalDataException
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
 at java.util.HashMap.readObject(HashMap.java:1402)
 at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
 at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:304)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:304)
 at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
 at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
 at scala.util.Try$.apply(Try.scala:192)
 at akka.serialization.Serialization.deserialize(Serialization.scala:131)
 at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:80)
 at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:151)
 at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:137)
 at scala.util.Try$.apply(Try.scala:192)
 at akka.serialization.Serialization.deserialize(Serialization.scala:131)
 at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
 at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
 at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
 at akka.remote.DefaultMessageDispatcher.msgLog$1(Endpoint.scala:69)
 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:81)
 at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:988)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
 at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Front End Class

=======================
ActorSystem system = ActorSystem.create("iCEDQApp",ConfigFactory.load());
   
   System.out.println("IceCompareEngine ============ >>>>>> "+context_._ruleType);
   ClusterRegisterOnMemberUp registerUp = new ClusterRegisterOnMemberUp(actors,context_.getRiid(),context_,system,context_._ruleType);
   FutureTask<ActorRef> futureTask = new FutureTask<ActorRef>(registerUp);

//   ExecutorService executor = Executors.newFixedThreadPool(1);
//   executor.execute(futureTask);
   Cluster.get(system).registerOnMemberUp(futureTask);
   while (true){
    try{
     if(futureTask.isDone()){
      System.out.println(">>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>> ");
      break;
     }
    }catch (Exception e) {
     // TODO: handle exception
    }
   }
ExpressionEvaluation.conf
application.conf
ClusterRegisterOnMemberUp.java
DataEvaluatorActor.java
Main.java
Master.java

Patrik Nordwall

unread,
May 6, 2017, 8:26:36 AM5/6/17
to Akka User List
First, don't use java serialization for performance and security reasons. Secondly, actor messages should be small (a few 100kB at most). Otherwise they will prevent other messages to get through, such as cluster heartbeat messages. Split the large message into smaller messages, or transfer it on a side channel such as Akka Http or Stream TCP. I'd also recommend that you try the new remoting implementatio, see Artery in docs.

/Patrik
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Kunal Ghosh

unread,
May 8, 2017, 8:36:57 AM5/8/17
to Akka User List
Thanks @Patrik Your help is much appreciated !! 

Below are my configuration for Kryo Serialization and Artery remote implementation in application.conf file. Please go through it and tell me whether is it correct ?? 
Also I have a question that changing configuration is enough or I will have to make changes in the code as well?

application.conf

akka {

  loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "DEBUG"
 
  stdout-loglevel = "DEBUG"
 
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  
  actor.provider = "akka.cluster.ClusterActorRefProvider"

# Artery remoting implementation

remote {
    log-remote-lifecycle-events = on
    log-sent-messages = on
    log-received-messages = on
    artery {
      enabled = on
      canonical.hostname = "192.168.100.199"
      canonical.port = 25520
    } 
  }
  
KryoSerializer Configuration

  actor {
    kryo {
      type = "graph"
      idstrategy = "incremental"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifest = false
      implicit-registration-logging = true
      kryo-trace = true

      mappings {
        "org.iceengine.compare.engine.ICEEngineContext" = 32
      "org.iceengine.compare.akka.RowData" = 33
      "org.iceengine.compare.akka.DataConsumerInspector" = 34
      "org.iceengine.compare.akka.Result" = 35
      }
    }

    serialize-messages = on
    serializers {
      #java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
      "org.iceengine.compare.engine.ICEEngineContext" = kryo
      "org.iceengine.compare.akka.RowData" = kryo
      "org.iceengine.compare.akka.DataConsumerInspector" = kryo
      "org.iceengine.compare.akka.Result" = kryo
    }
  }
    
  cluster {
    seed-nodes = [
      "akka://iCED...@192.168.100.199:2551",
      "akka://iCED...@192.168.100.199:2552"]

    #auto-down-unreachable-after = 10s
  }

akka.cluster.min-nr-of-members =3
# //#min-nr-of-members

# //#role-min-nr-of-members
akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}
actor.allow-java-serialization = off
actor.deployment {
 
  "/*/*" {
  
    # Router type provided by metrics extension. 
    #router = cluster-metrics-adaptive-group
    router = round-robin-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
 
}
  # Disable legacy metrics in akka-cluster.
cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
extensions=[
   "akka.cluster.metrics.ClusterMetricsExtension",
   "com.romix.akka.serialization.kryo.KryoSerializationExtension$"
   ]

  
}

Patrik Nordwall

unread,
May 8, 2017, 9:07:17 AM5/8/17
to akka...@googlegroups.com
the port number for the seed-nodes does not match canonical.port = 25520

replacing akka.tcp with akka is correct, and if you have that in the code somewhere it must be changed there also

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Kunal Ghosh

unread,
May 12, 2017, 8:46:57 AM5/12/17
to Akka User List
Hi,
How do i set generics for ObjectArraySerializer in kryos ?

public class ICEUniqueSource{

private final ICEColSource[] _columns;

}


public class ICEColSource{

}

Following error --
00:11 TRACE: [kryo] Write field: _columns (org.iceengine.compare.engine.ICEUniqueSource) pos=788
00:11 TRACE: [kryo] Write class 910779913: org.iceengine.compare.engine.ICEColSource[]
00:11 TRACE: [kryo] setting generics for ObjectArraySerializer
00:11 TRACE: [kryo] Write initial object reference 91: org.iceengine.compare.engine.ICEColSource[]
00:11 DEBUG: [kryo] Write: org.iceengine.compare.engine.ICEColSource[]

WARN [17:57:38.455]{iCEDQApp-akka.actor.default-dispatcher-11}(Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2.apply$mcV$sp:78)-Failed to deserialize message with serializer id [6] and manifest []. Encountered unregistered class ID: 910779913
Serialization trace:
_columns (org.iceengine.compare.engine.ICEUniqueSource)
_model (org.iceengine.compare.sources.ICESourceFlatFile)
_srcInput (org.iceengine.compare.conf.DefaultDataComparison)
_dataComparison (org.iceengine.compare.engine.ICEEngineContext)
_context (org.iceengine.compare.akka.RowData)

application.conf ( Only KryoSerializer Configuration)

akka {

actor {
    kryo {
      type = "graph"
      idstrategy = "incremental"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifest = false
      implicit-registration-logging = true
      kryo-trace = true

      mappings {
        "org.iceengine.compare.engine.ICEUniqueSource" = 72
        "org.iceengine.compare.engine.ICEColSource" = 63
      }
    }

    serialize-messages = on
    serializers {
      #java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
      "org.iceengine.compare.engine.ICEUniqueSource" = kryo 
      "org.iceengine.compare.engine.ICEColSource" = kryo 
    }
  }

Patrik Nordwall
Akka Tech Lead

Justin du coeur

unread,
May 12, 2017, 12:01:37 PM5/12/17
to akka...@googlegroups.com
My rule of thumb is that you should never, ever use

idstrategy = "incremental"

What that is saying is that, at serialization time, if it encounters a type that isn't registered yet, it makes up a registration out of thin air.  This is essentially guaranteed to fail unless the order of serialization and deserialization is absolutely deterministic.  (Really, I don't think that idstrategy should even exist, since it is nothing but a trap for the unwary.)

That's likely why it's failing: you are trying to serialize some type (probably in a subfield) that isn't registered, so it's just making up a serialization ID.  It gets to the other end, which has never heard of that ID, and crashes.

Choose a different idstrategy.  For Akka Persistence I consider "explicit" to be the only sane option, but that's a lot of work; for other circumstances, "automatic" should usually work decently well...

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

Kunal Ghosh

unread,
May 15, 2017, 11:52:28 PM5/15/17
to Akka User List
Hey Patrik, Thanks for the help !!
your solution worked ! 
Now when I run my application in non clustered environment with round robin pool (no of instances = 8) it take 23 seconds to process 2 million rows of data. 
But when I run same application in clustered environment it took 23 minutes !!! The master node reads data from input file and sends it over to both seed-nodes (each node have no of instances = 4) for evaluation (processing). I can not figure out the reason behind this. 
Machine Information :
RAM : 16 GB
Logical Process : 8
Cores : 4
CPU : Intel(R) Core(TM) i7-4700MQ CPU @ 2.40GHz

Can you Help ???
I have also attached the configuration file and code examples are attached with this mail please do check them out.

Application configuration file (without cluster)

iCEDQDispatcher{

 worker-dispatcher {
      type = Dispatcher
     executor = "fork-join-executor"
     fork-join-executor {
         parallelism-min = 2
         parallelism-factor = 2.0
      parallelism-max = 64
      }
      throughput = 5
   }
 
   CallingThreadDispatcher {
         type = akka.testkit.CallingThreadDispatcherConfigurator
    }
}


Application configuration file (with cluster)

akka {


  loggers = ["akka.event.slf4j.Slf4jLogger"]
 
  
  loglevel = "INFO"
 
  stdout-loglevel = "INFO"
 

  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  
  actor.provider = "akka.cluster.ClusterActorRefProvider"

 remote {
    log-remote-lifecycle-events = on
    log-sent-messages = on
    log-received-messages = on
    artery {
      enabled = on
      canonical.hostname = "192.168.100.199"
      canonical.port = 25520
    } 
  }

actor {
    kryo {
      type = "graph"
      idstrategy = "incremental"
      #"explicit"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifest = false
      implicit-registration-logging = true
      kryo-trace = false
      mappings {
      "[Ljava.lang.String;" = 20
      "[Lorg.iceengine.compare.engine.ICEColSource;" = 21
      "org.iceengine.compare.engine.ICEColSource$Type" = 22
      "java.text.DecimalFormat" = 23
      "java.math.RoundingMode" = 24
      "java.text.DecimalFormatSymbols" = 25
      "java.util.Locale" = 26
      "[Ljava.lang.Integer;" = 27
      "[Ljava.lang.Object;" = 28
      "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = 29
      "[Lorg.iceengine.common.CompareChain;" = 30
      "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = 31
      
        "org.iceengine.compare.engine.ICEEngineContext" = 32
      "org.iceengine.compare.akka.RowData" = 33
      "org.iceengine.compare.akka.DataConsumerInspector" = 34
      "org.iceengine.compare.akka.Result" = 35
      "org.iceengine.common.pairs.IceEnginePairs" = 36
"org.iceengine.common.pairs.IcePairKey" = 37
"org.iceengine.common.pairs.IceEnginePairsImpl" = 38
"org.iceengine.common.DataComparator" = 39
"org.iceengine.common.pairs.IcePairValue" = 40
"org.iceengine.common.MapCont" = 41
"org.iceengine.common.ObjectComparator" = 42
"org.iceengine.compare.akka.AtomicObject" = 43
"org.iceengine.compare.akka.SimpleBindings" = 44
"org.iceengine.compare.comparator.DKChainDiffor" = 46
"org.iceengine.compare.comparator.DKConvertingDiffor" = 47
"org.iceengine.compare.comparator.DKDateDiffor" = 48
"org.iceengine.compare.comparator.DKEqualsDiffor" = 49
"org.iceengine.compare.comparator.DKIdentityDiffor" = 50
"org.iceengine.compare.comparator.DKNumberDiffor" = 51
"org.iceengine.compare.comparator.DKTextDiffor" = 52
"org.iceengine.compare.conf.DefaultDataComparison" = 53
"org.iceengine.compare.conf.ICEBuildRule" = 54
"org.iceengine.compare.conf.ICEDepRule" = 55
"org.iceengine.compare.conf.ICEEngine" = 56
"org.iceengine.compare.conf.ICEExpression" = 57
"org.iceengine.compare.conf.ICERule" = 58
"org.iceengine.compare.conf.ICERulePT" = 59
"org.iceengine.compare.conf.ICERuleRuleContainer" = 60
"org.iceengine.compare.engine.CommonRowObj" = 61
"org.iceengine.compare.engine.DataComparison" = 62
"org.iceengine.compare.engine.ICEColSource" = 63
"org.iceengine.compare.engine.ICECompareColInRow" = 64
"org.iceengine.compare.engine.ICECompareColumn" = 65
"org.iceengine.compare.engine.ICECompareColumnData" = 66
"org.iceengine.compare.engine.ICECompareEngine" = 67
"org.iceengine.compare.engine.ICECompareRow" = 68
"org.iceengine.compare.engine.ICEExprCompare" = 69
"org.iceengine.compare.engine.ICEFinalExpressionDiff" = 70
"org.iceengine.compare.engine.ICESourceUtil" = 71
"org.iceengine.compare.engine.ICEUniqueSource" = 72
"org.iceengine.compare.sources.EngineWritterFormat" = 73
"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = 74
"org.iceengine.compare.sources.FileSort" = 75
"org.iceengine.compare.sources.FinalWritter" = 76
"org.iceengine.compare.sources.ICEDBWritter" = 77
"org.iceengine.compare.sources.IceFileWritter" = 78
"org.iceengine.compare.sources.ICESourceDataBase" = 79
"org.iceengine.compare.sources.IceSourceFixedFile" = 80
"org.iceengine.compare.sources.ICESourceFlatFile" = 81
"org.iceengine.compare.sources.ICESourceList" = 82
"org.iceengine.compare.sources.ICESourceSS" = 83
"org.iceengine.compare.sources.ICESourceXMLFile" = 84
"org.iceengine.compare.sources.IceUtilsDifference" = 85
"org.iceengine.compare.sources.ListWritter" = 86
"org.iceengine.compare.sources.LogWriter" = 87
"org.iceengine.compare.sources.SqlWritter" = 88
"org.iceengine.compare.sources.SSSheet" = 89
"org.iceengine.compare.sources.SSSheetImpl" = 90
"org.iceengine.db.ColumnMetaData" = 91
"org.iceengine.db.DatabaseKeyImpl" = 92
"org.iceengine.db.DatabaseTab" = 93
"org.iceengine.db.DatabaseTabInfoDataPer" = 94
"org.iceengine.db.DatabaseTabPer" = 95
"org.iceengine.db.EngineConnectionInfo" = 96
"org.iceengine.db.ICEDB" = 97
"org.iceengine.db.ICEInfoDataType" = 98
"org.iceengine.db.QueryBuilder" = 99
"org.iceengine.utils.ArrayUtil" = 100
"org.iceengine.utils.BooleanUtil" = 101
"org.iceengine.utils.ClassUtil" = 102
"org.iceengine.utils.DataBaseUtilUtil" = 103
"org.iceengine.utils.EngineXMLUtils" = 104
"org.iceengine.utils.FileUtil" = 105
"org.iceengine.utils.ICEDataUtil" = 106
"org.iceengine.utils.ICEEngineConstants" = 107
"org.iceengine.utils.ICEUtilsObject" = 108
"org.iceengine.utils.IceUtilsRes" = 109
"org.iceengine.utils.ICEUtilsString" = 110
"org.iceengine.utils.SpringUtils" = 111
"org.apache.commons.collections.OrderedMap" = 112
"org.apache.commons.collections.map.LinkedMap" = 113
"java.io.File" = 114
"java.util.HashMap" = 115
"org.iceengine.compare.engine.ICECompare$CompareType" = 116
"org.iceengine.common.CompareChain" = 117
      "org.apache.commons.collections.comparators.ComparatorChain" = 118
      "java.util.ArrayList" = 119
      "java.util.BitSet" = 120
      "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = 121
      "[Lorg.iceengine.common.MapCont;" = 122
      "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = 123
      "java.util.concurrent.atomic.AtomicLong" = 124
      "java.util.concurrent.ConcurrentHashMap" = 125
      "java.util.concurrent.atomic.AtomicLongArray" = 126
      "java.util.concurrent.atomic.AtomicBoolean" = 127
      "java.text.SimpleDateFormat" = 128
      "java.util.GregorianCalendar" = 129
      "java.util.Date" = 130
      "java.text.DateFormatSymbols" = 131
      "javax.script.ScriptEngineManager" = 132
      }
      
    }
    serialize-messages = on
    serializers {
      #java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
    
     
    
      "[Ljava.lang.String;" = kryo
      "[Lorg.iceengine.compare.engine.ICEColSource;" = kryo
      "org.iceengine.compare.engine.ICEColSource$Type" = kryo
      "java.text.DecimalFormat" = kryo
      "java.math.RoundingMode" = kryo
      "java.text.DecimalFormatSymbols" = kryo
      "java.util.Locale" = kryo
      "[Ljava.lang.Integer;" = kryo
      "[Ljava.lang.Object;" = kryo
      "[Lorg.iceengine.compare.engine.ICECompareColumnData;" = kryo
      "[Ljava.util.Comparator;" = kryo
      "[Lorg.iceengine.common.CompareChain;" = kryo
      
      "org.iceengine.compare.engine.ICEEngineContext" = kryo
      "org.iceengine.compare.akka.RowData" = kryo
      "org.iceengine.compare.akka.DataConsumerInspector" = kryo
      "org.iceengine.compare.akka.Result" = kryo
      "org.iceengine.common.pairs.IceEnginePairs" = kryo
"org.iceengine.common.pairs.IcePairKey" = kryo
"org.iceengine.common.pairs.IceEnginePairsImpl" = kryo
"org.iceengine.common.DataComparator" = kryo
"org.iceengine.common.pairs.IcePairValue" = kryo
"org.iceengine.common.MapCont" = kryo
"org.iceengine.common.ObjectComparator" = kryo
"org.iceengine.compare.akka.AtomicObject" = kryo
"org.iceengine.compare.akka.SimpleBindings" = kryo
"org.iceengine.compare.comparator.DKChainDiffor" = kryo
"org.iceengine.compare.comparator.DKConvertingDiffor" = kryo
"org.iceengine.compare.comparator.DKDateDiffor" = kryo
"org.iceengine.compare.comparator.DKEqualsDiffor" = kryo
"org.iceengine.compare.comparator.DKIdentityDiffor" = kryo
"org.iceengine.compare.comparator.DKNumberDiffor" = kryo
"org.iceengine.compare.comparator.DKTextDiffor" = kryo
"org.iceengine.compare.conf.DefaultDataComparison" = kryo
"org.iceengine.compare.conf.ICEBuildRule" = kryo
"org.iceengine.compare.conf.ICEDepRule" = kryo
"org.iceengine.compare.conf.ICEEngine" = kryo
"org.iceengine.compare.conf.ICEExpression" = kryo
"org.iceengine.compare.conf.ICERule" = kryo
"org.iceengine.compare.conf.ICERulePT" = kryo
"org.iceengine.compare.conf.ICERuleRuleContainer" = kryo
"org.iceengine.compare.engine.CommonRowObj" = kryo
"org.iceengine.compare.engine.DataComparison" = kryo
"org.iceengine.compare.engine.ICEColSource" = kryo
"org.iceengine.compare.engine.ICECompareColInRow" = kryo
"org.iceengine.compare.engine.ICECompareColumn" = kryo
"org.iceengine.compare.engine.ICECompareColumnData" = kryo
"org.iceengine.compare.engine.ICECompareEngine" = kryo
"org.iceengine.compare.engine.ICECompareRow" = kryo
"org.iceengine.compare.engine.ICEExprCompare" = kryo
"org.iceengine.compare.engine.ICEFinalExpressionDiff" = kryo
"org.iceengine.compare.engine.ICESourceUtil" = kryo
"org.iceengine.compare.engine.ICEUniqueSource" = kryo
"org.iceengine.compare.sources.EngineWritterFormat" = kryo
"org.iceengine.compare.sources.ExternalSortingForUserDefiendColumn" = kryo
"org.iceengine.compare.sources.FileSort" = kryo
"org.iceengine.compare.sources.FinalWritter" = kryo
"org.iceengine.compare.sources.ICEDBWritter" = kryo
"org.iceengine.compare.sources.IceFileWritter" = kryo
"org.iceengine.compare.sources.ICESourceDataBase" = kryo
"org.iceengine.compare.sources.IceSourceFixedFile" = kryo
"org.iceengine.compare.sources.ICESourceFlatFile" = kryo
"org.iceengine.compare.sources.ICESourceList" = kryo
"org.iceengine.compare.sources.ICESourceSS" = kryo
"org.iceengine.compare.sources.ICESourceXMLFile" = kryo
"org.iceengine.compare.sources.IceUtilsDifference" = kryo
"org.iceengine.compare.sources.ListWritter" = kryo
"org.iceengine.compare.sources.LogWriter" = kryo
"org.iceengine.compare.sources.SqlWritter" = kryo
"org.iceengine.compare.sources.SSSheet" = kryo
"org.iceengine.compare.sources.SSSheetImpl" = kryo
"org.iceengine.db.ColumnMetaData" = kryo
"org.iceengine.db.DatabaseKeyImpl" = kryo
"org.iceengine.db.DatabaseTab" = kryo
"org.iceengine.db.DatabaseTabInfoDataPer" = kryo
"org.iceengine.db.DatabaseTabPer" = kryo
"org.iceengine.db.EngineConnectionInfo" = kryo
"org.iceengine.db.ICEDB" = kryo
"org.iceengine.db.ICEInfoDataType" = kryo
"org.iceengine.db.QueryBuilder" = kryo
"org.iceengine.utils.ArrayUtil" = kryo
"org.iceengine.utils.BooleanUtil" = kryo
"org.iceengine.utils.ClassUtil" = kryo
"org.iceengine.utils.DataBaseUtilUtil" = kryo
"org.iceengine.utils.EngineXMLUtils" = kryo
"org.iceengine.utils.FileUtil" = kryo
"org.iceengine.utils.ICEDataUtil" = kryo
"org.iceengine.utils.ICEEngineConstants" = kryo
"org.iceengine.utils.ICEUtilsObject" = kryo
"org.iceengine.utils.IceUtilsRes" = kryo
"org.iceengine.utils.ICEUtilsString" = kryo
"org.apache.commons.collections.OrderedMap" = kryo
"org.apache.commons.collections.map.LinkedMap" = kryo
"java.io.File" = kryo
"java.util.HashMap" = kryo
"org.iceengine.compare.engine.ICECompare$CompareType" = kryo
"org.iceengine.common.CompareChain" = kryo
      "org.apache.commons.collections.comparators.ComparatorChain" = kryo
      "[Lorg.apache.commons.collections.comparators.ComparatorChain;" = kryo
      "java.util.ArrayList" = kryo
      "java.util.BitSet" = kryo
      "[Lorg.iceengine.compare.engine.ICEUniqueSource;" = kryo
      "[Lorg.iceengine.common.MapCont;" = kryo
      "org.iceengine.compare.engine.ICEEngineContext$RuleUKey" = kryo
      "java.util.concurrent.atomic.AtomicLong" = kryo
       "java.util.concurrent.ConcurrentHashMap" = kryo
       "java.util.concurrent.atomic.AtomicLongArray" = kryo
       "java.util.concurrent.atomic.AtomicBoolean" = kryo
       "java.text.SimpleDateFormat" = kryo
       "java.util.GregorianCalendar" = kryo
       "java.util.Date" = kryo
       "java.text.DateFormatSymbols" = kryo
       "javax.script.ScriptEngineManager" = kryo
    }
  }
  cluster {
    seed-nodes = [
      "akka://iCED...@192.168.100.199:2551",
      "akka://iCED...@192.168.100.199:2552"]

    #auto-down-unreachable-after = 10s
  }

actor.allow-java-serialization = off
actor.deployment {
  "/*/*" {
 
    # Router type provided by metrics extension. 
    #router = cluster-metrics-adaptive-group
    router = round-robin-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/expEvaluationBackend"]
    nr-of-instances = 100
    cluster {
      enabled = on
      use-role = backend
      max-nr-of-instances-per-node = 3
      allow-local-routees = off
    }
  }
  
  
 
}
worker-dispatcher {
   type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
        parallelism-min = 2
       parallelism-factor = 2.0
   parallelism-max = 64
   }
   throughput = 5
   }
  # Disable legacy metrics in akka-cluster.
cluster.metrics.enabled=off


extensions=[
"akka.cluster.metrics.ClusterMetricsExtension",
"com.romix.akka.serialization.kryo.KryoSerializationExtension$"
]

  
}

Patrik Nordwall
Akka Tech Lead

application.conf
ExpressionEvaluation.conf
ClusterRegisterOnMemberUp.java
DataEvaluatorActor.java
Main.java
Master.java

Kunal Ghosh

unread,
May 17, 2017, 12:51:21 AM5/17/17
to Akka User List
Is it because I am running application on single physical machine , the application is taking more time the process? 

Patrik Nordwall

unread,
May 17, 2017, 3:16:54 AM5/17/17
to akka...@googlegroups.com
Performance debugging/tuning is not something I can help with in free OSS support. We would be able to do that in Lightbend's commercial support.

Regards,
Patrik

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Kunal Ghosh

unread,
May 17, 2017, 6:42:54 AM5/17/17
to Akka User List
I just had to delete aprox 300 GB of aeron-temp files in c:\Users\admin\AppData\Local\Temp.
 how to clean up Aeron files ?

Patrik Nordwall

unread,
May 17, 2017, 7:22:12 AM5/17/17
to akka...@googlegroups.com
When you use the embedded media driver (that is the default) the files should be deleted when the actor system is terminated, but not if the process is killed abruptly.

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Kunal Ghosh

unread,
May 17, 2017, 7:50:16 AM5/17/17
to Akka User List
Thanks I really appreciate your help !!!
When I run my application for 43 rows (2kb) input file it creates 1.14 gb of file in c:\Users\admin\AppData\Local\Temp. When i open up that file in editor apart from other things I see data in it. I think it is due to this writing of data in file the performance is getting hit ( when i am processing big file). Is there any way in aeron configure application so that it won't create that temporary file???

Patrik Nordwall

unread,
May 18, 2017, 4:42:46 AM5/18/17
to akka...@googlegroups.com
I don't think the bottleneck is in Artery/Aeron. We have measured very good throughput such as
  • 100 bytes message: 689,655 msg/s, 68,965,517 bytes/s
  • 10,0000 bytes message: 12,392 msg/s, 123,924,183 bytes/s


To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
May 18, 2017, 4:43:28 AM5/18/17
to akka...@googlegroups.com
and I mean 10,000 bytes message, ofc

Kunal Ghosh

unread,
May 18, 2017, 11:26:20 AM5/18/17
to Akka User List
Hi Patrik,
One request you to please tell me whether slow performance is because I am serializing 150 files(classes) or is there is problem with my configuration ???
I am stuck please help !!!!

Kunal Ghosh

unread,
May 22, 2017, 6:48:22 AM5/22/17
to Akka User List
Hi Patrik ,
Thanks for all your help. I found a solution for the issue i was facing last time. 
akka.actor.serialize-messages was 'on' i switched it to 'off'.
Now while running program gets stuck after processing 10000 - 15000 rows it never throw any error or exception. the processing is done with two nodes. What could be the reason behind this?
can you help?

Kunal Ghosh

unread,
May 26, 2017, 2:37:36 AM5/26/17
to Akka User List
Following is the console output of my Application. How to solve this(any changes required in aeron configuration)? 

[INFO] [05/26/2017 11:46:46.619] [iCEDQApp-akka.actor.default-dispatcher-23] [akka://iCEDQApp/deadLetters] Message [org.iceengine.compare.akka.RowData] from Actor[akka://iCEDQApp/user/master_27256#-976263104] to Actor[akka://iCEDQApp/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/26/2017 11:46:46.619] [iCEDQApp-akka.actor.default-dispatcher-23] [akka://iCEDQApp/deadLetters] Message [org.iceengine.compare.akka.RowData] from Actor[akka://iCEDQApp/user/master_27256#-976263104] to Actor[akka://iCEDQApp/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/26/2017 11:46:46.619] [iCEDQApp-akka.actor.default-dispatcher-23] [akka://iCEDQApp/deadLetters] Message [org.iceengine.compare.akka.RowData] from Actor[akka://iCEDQApp/user/master_27256#-976263104] to Actor[akka://iCEDQApp/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
ReadData_10000(3825 ms) ReadData_100000(2143 ms) ReadData_100000(1556 ms) ReadData_100000(1316 ms) ReadData_100000(1759 ms) ReadData_100000(1891 ms) ReadData_100000(1445 ms) ReadData_100000(1368 ms) ReadData_100000(1880 ms) ReadData_100000(1210 ms) ReadData_100000(1421 ms) ReadData_100000(1336 ms) ReadData_100000(1756 ms) ReadData_100000(1380 ms) ReadData_100000(1245 ms) ReadData_100000(1389 ms) ReadData_100000(1836 ms) ReadData_100000(1340 ms) 
Total time >>> 31
 >>>>>>>>>>>>>>>> Total rows = 1953230 , actorSentCounterToDataEvaluatorActor >> 1886130 , actorCompletedCounter >> 57798
 ReadData_100000(1565 ms)
[DEBUG] [05/26/2017 11:47:38.900] [iCEDQApp-akka.remote.default-remote-dispatcher-8] [akka.actor.LocalActorRefProvider(akka://iCEDQApp)] resolve of path sequence [/system/cluster/core/daemon/firstSeedNodeProcess-1#-513221115] failed
[DEBUG] [05/26/2017 11:47:38.900] [iCEDQApp-akka.remote.default-remote-dispatcher-19] [akka.actor.LocalActorRefProvider(akka://iCEDQApp)] resolve of path sequence [/system/cluster/core/daemon/joinSeedNodeProcess-1#2147149776] failed
[DEBUG] [05/26/2017 11:47:39.875] [iCEDQApp-akka.remote.default-remote-dispatcher-18] [akka.actor.LocalActorRefProvider(akka://iCEDQApp)] resolve of path sequence [/system/cluster/core/daemon/joinSeedNodeProcess-1#1226966857] failed
[DEBUG] [05/26/2017 11:47:39.979] [iCEDQApp-akka.remote.default-remote-dispatcher-17] [akka.actor.LocalActorRefProvider(akka://iCEDQApp)] resolve of path sequence [/system/cluster/core/daemon/joinSeedNodeProcess-1#1226966857] failed
Reply all
Reply to author
Forward
0 new messages