I try to update from Tranquility 0.5.1 (Scala 2.10) to Tranquility 0.7.4 (also testes with 0.8.2).
But when i deploy the Topology to storm, i get following exception:
java.lang.RuntimeException: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:228) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.task$mk_task_data$fn__3129.invoke(task.clj:180) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.executor$mk_executor$fn__3308.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
at clojure.core$map$fn__4207.invoke(core.clj:2485) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) ~[clojure-1.5.1.jar:na]
at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630$iter__4635__4639$fn__4640.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na]
at clojure.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4629$mk_worker__4685.doInvoke(worker.clj:393) [storm-core-0.9.6.jar:0.9.6]
at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$_main.invoke(worker.clj:504) [storm-core-0.9.6.jar:0.9.6]
at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.6.jar:0.9.6]
Caused by: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) ~[na:1.8.0_92]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) ~[na:1.8.0_92]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) ~[na:1.8.0_92]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) ~[na:1.8.0_92]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_92]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_92]
at backtype.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:52) ~[storm-core-0.9.6.jar:0.9.6]
... 36 common frames omitted
I tried this with Storm 0.9.5 and 0.9.6 - update to 1.x is currently not possible for us.
Even a simple Test won't work and returns with the same exception. This exception won't occure, when i run them with the local cluster test mode.
I allready tried to remove all Storm entries from ZooKeeper, clearing the Storm directory on disk and try it with a new and clean Storm 0.9.6
I checked the dependency versions from my project with the Storm Lib directory, which seem to have the same versions.
My pom.xml looks like
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.10</artifactId>
<version>0.7.4</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-storm_2.10</artifactId>
<version>0.7.4</version>
</dependency>
</dependencies>
And my Test-Topology:
builder.setSpout("TestSpout", new BaseRichSpout() {
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
}
@Override
public void nextTuple() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(new ArrayList<String>() { {
add("Test");
}
}));
}
});
BeamBolt<Map<String, Object>> druidBolt = new BeamBolt<>(new DruidTranquilityFactory(), 16384);
builder.setBolt("DruidBolt", druidBolt, 1)
.shuffleGrouping("TestSpout");
Where the DruidTranquilityFactory
public class DruidTranquilityFactory implements BeamFactory<Map<String, Object>>
{
private static final long serialVersionUID = -1491596146457491987L;
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(Configuration.ZooKeeperUriDruid, new RetryOneTime(1000));
curator.start();
final String dataSource = "TestDs";
final DruidBeams.Builder<Map<String, Object>, Map<String, Object>> builder = DruidBeams
.builder(
new Timestamper<Map<String, Object>>()
{
@Override
public DateTime timestamp(Map<String, Object> theMap)
{
return new DateTime(theMap.get("timestamp"), DateTimeZone.UTC);
}
}
)
.curator(curator)
.discoveryPath("/druid/discovery")
.location(
DruidLocation.create(
"druid:prod:overlord",
"druid:local:firehose:%s",
dataSource
)
)
.timestampSpec(new TimestampSpec("timestamp", "millis", null))
.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.NONE))
.tuning(ClusteredBeamTuning.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT70M"))
.build())
.druidTuning(DruidTuning.create(100000, new Period("PT1H"), 0));
final Service<List<Map<String, Object>>, Integer> service = builder.buildJavaService();
final Beam<Map<String, Object>> beam = builder.buildBeam();
return beam;
}
}
I allready tried to exlude the storm and zookeeper dependency from the tranquility pom entry, as well as using the version for Scala 2.10 and 2.11!
Any idea what goes wrong?