Tranquility Storm - InvalidClassException, local class incompatible

77 views
Skip to first unread message

Matthias Kwiedor

unread,
Jul 28, 2016, 6:31:11 AM7/28/16
to Druid User
I try to update from Tranquility 0.5.1 (Scala 2.10) to Tranquility 0.7.4 (also testes with 0.8.2).

But when i deploy the Topology to storm, i get following exception:

java.lang.RuntimeException: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
        at backtype
.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:56) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.utils.Utils.getSetComponentObject(Utils.java:228) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.task$get_task_object.invoke(task.clj:73) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.task$mk_task_data$fn__3129.invoke(task.clj:180) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.util$assoc_apply_self.invoke(util.clj:850) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.task$mk_task_data.invoke(task.clj:173) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.executor$mk_executor$fn__3308.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure
.core$map$fn__4207.invoke(core.clj:2485) ~[clojure-1.5.1.jar:na]
        at clojure
.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
        at clojure
.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure
.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure
.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
        at clojure
.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.5.1.jar:na]
        at clojure
.core.protocols$fn__6026.invoke(protocols.clj:54) ~[clojure-1.5.1.jar:na]
        at clojure
.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) ~[clojure-1.5.1.jar:na]
        at clojure
.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
        at clojure
.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
        at backtype
.storm.daemon.executor$mk_executor.invoke(executor.clj:329) ~[storm-core-0.9.6.jar:0.9.6]
        at backtype
.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630$iter__4635__4639$fn__4640.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure
.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
        at clojure
.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
        at clojure
.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
        at clojure
.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
        at clojure
.core$dorun.invoke(core.clj:2780) ~[clojure-1.5.1.jar:na]
        at clojure
.core$doall.invoke(core.clj:2796) ~[clojure-1.5.1.jar:na]
        at backtype
.storm.daemon.worker$fn__4629$exec_fn__1104__auto____4630.invoke(worker.clj:426) ~[storm-core-0.9.6.jar:0.9.6]
        at clojure
.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure
.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure
.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
        at backtype
.storm.daemon.worker$fn__4629$mk_worker__4685.doInvoke(worker.clj:393) [storm-core-0.9.6.jar:0.9.6]
        at clojure
.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype
.storm.daemon.worker$_main.invoke(worker.clj:504) [storm-core-0.9.6.jar:0.9.6]
        at clojure
.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure
.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype
.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.6.jar:0.9.6]
Caused by: java.io.InvalidClassException: com.metamx.tranquility.storm.BeamBolt; local class incompatible: stream classdesc serialVersionUID = -1956694456247380936, local class serialVersionUID = -5294014198338257187
        at java
.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) ~[na:1.8.0_92]
        at java
.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) ~[na:1.8.0_92]
        at java
.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) ~[na:1.8.0_92]
        at java
.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) ~[na:1.8.0_92]
        at java
.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_92]
        at java
.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_92]
        at backtype
.storm.serialization.DefaultSerializationDelegate.deserialize(DefaultSerializationDelegate.java:52) ~[storm-core-0.9.6.jar:0.9.6]
       
... 36 common frames omitted

I tried this with Storm 0.9.5 and 0.9.6 - update to 1.x is currently not possible for us.

Even a simple Test won't work and returns with the same exception. This exception won't occure, when i run them with the local cluster test mode.
I allready tried to remove all Storm entries from ZooKeeper, clearing the Storm directory on disk and try it with a new and clean Storm 0.9.6
I checked the dependency versions from my project with the Storm Lib directory, which seem to have the same versions.

My pom.xml looks like

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
        <!-- keep storm out of the jar-with-dependencies -->
        <scope>provided</scope>
    </dependency>
    <dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.10</artifactId>
<version>0.7.4</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-storm_2.10</artifactId>
<version>0.7.4</version>
</dependency>
</dependencies>

And my Test-Topology:

builder.setSpout("TestSpout", new BaseRichSpout() {
   
@Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

    }

   
@Override
    public void nextTuple() {

    }

   
@Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(
new Fields(new ArrayList<String>() { {
            add(
"Test");
        }
        }))
;
    }
})
;

BeamBolt<Map<String, Object>> druidBolt = new BeamBolt<>(new DruidTranquilityFactory(), 16384);
builder.setBolt("DruidBolt", druidBolt, 1)
        .shuffleGrouping(
"TestSpout");

Where the DruidTranquilityFactory

public class DruidTranquilityFactory implements BeamFactory<Map<String, Object>>
{
    private static final long serialVersionUID = -1491596146457491987L;

    @Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
        final CuratorFramework curator = CuratorFrameworkFactory.newClient(Configuration.ZooKeeperUriDruid, new RetryOneTime(1000));
curator.start();

final String dataSource = "TestDs";

final DruidBeams.Builder<Map<String, Object>, Map<String, Object>> builder = DruidBeams
.builder(
                        new Timestamper<Map<String, Object>>()
{
                            @Override
                            public DateTime timestamp(Map<String, Object> theMap)
{
                                return new DateTime(theMap.get("timestamp"), DateTimeZone.UTC);
                            }
}
                )
.curator(curator)
                .discoveryPath("/druid/discovery")
.location(
                        DruidLocation.create(
"druid:prod:overlord",
"druid:local:firehose:%s",
                                dataSource
                        )
)
                .timestampSpec(new TimestampSpec("timestamp", "millis", null))
.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.NONE))
.tuning(ClusteredBeamTuning.builder()
                        .segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT70M"))
                        .build())
                .druidTuning(DruidTuning.create(100000, new Period("PT1H"), 0));

final Service<List<Map<String, Object>>, Integer> service = builder.buildJavaService();
final Beam<Map<String, Object>> beam = builder.buildBeam();

return beam;
    }
}

I allready tried to exlude the storm and zookeeper dependency from the tranquility pom entry, as well as using the version for Scala 2.10 and 2.11!
Any idea what goes wrong? 

Reply all
Reply to author
Forward
0 new messages