java.lang.RuntimeException, when run the topology in cluster, storm 0.9.0

298 views
Skip to first unread message

Andres Gomez Ferrer

unread,
Aug 15, 2013, 4:33:29 PM8/15/13
to storm...@googlegroups.com
Hi, i have problem when i run the topology on the cluster. I use storm 0.9.0-wip21. 

My topology is spoutKafka-bolt1-bolt2

bolt1 like this:

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

package com.storm.redbordertopology.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.storm.redbordertopology.auxiliar.GetDirection;
import java.io.IOException;
import java.util.Map;

import clojure.lang.PersistentVector;

/**
 *
 * @author root
 */
public class GetDataEventBolt extends BaseRichBolt {

     OutputCollector _collector;
     String _topic;
     
     public GetDataEventBolt(String topic){
         this._topic = topic;
     }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("direction","events"));
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        
                GetDirection dir = new GetDirection();
                PersistentVector direccion = null;
                
         try {
             direccion = dir.process(tuple.getString(0), _topic);
         } catch (IOException ex) {
             System.out.println("Excepcion JACKSON: " + ex.toString());
         }
                
                
                _collector.emit(new Values(direccion,tuple));
                _collector.ack(tuple);
    }
    
}

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

This exception occurs when I run the topology in the cluster. What happens? 

Thanks!! 

Andres.

java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
	at backtype.storm.disruptor$consume_loop_STAR_$fn__2961.invoke(disruptor.clj:74)
	at backtype.storm.util$async_loop$fn__441.invoke(util.clj:396)
	at clojure.lang.AFn.run(AFn.java:24)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
	at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
	at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__5666$fn__5670.invoke(worker.clj:108)
	at backtype.storm.util$fast_list_map.invoke(util.clj:792)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__5666.invoke(worker.clj:108)
	at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3314.invoke(executor.clj:239)
	at backtype.storm.disruptor$clojure_handler$reify__2948.onEvent(disruptor.clj:43)
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
	... 6 more
Caused by: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.io.ObjectOutputStream.writeObject(Unknown Source)
	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
	... 18 more

kende...@gmail.com

unread,
Aug 17, 2013, 12:55:34 AM8/17/13
to storm...@googlegroups.com
looks like class that you bolt depends on can not be serialized, thus the bolt can't not be send off to the supervisor by nimbus.

Andres Gomez Ferrer

unread,
Aug 17, 2013, 7:25:48 AM8/17/13
to storm...@googlegroups.com
ok! This should be working? Because it throws the same exception.

This is the class, which bolt depends.
--------------------------------------------------------------------------------------------------------------------------------
package com.storm.redbordertopology.auxiliar;


import java.io.IOException;
import java.util.Calendar;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;

import clojure.lang.PersistentVector;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;




public class GetDirection implements Serializable  {
    
    public static final int TOPIC = 0;
    public static final int YEAR = 1;
    public static final int MONTH = 2;
    public static final int DAY = 3;
    public static final int HOUR = 4;
    public static final int MINUTE = 5;
    
    
   public PersistentVector process (String tupleStr, String topic) throws IOException{
        
       
         
         
      /*
       * Mapeamos el evento en una clase usando Jackson (json)
       */   
       GetDirection getDir = new GetDirection();
       
        String eventoStr = getDir.choose(tupleStr, "{", "}"); 
        
        //System.out.println("\n" + "EVENTO:D:"+ eventoStr+  "\n");
        
      JsonFactory f = new JsonFactory();
      JsonParser jp = f.createJsonParser(eventoStr);

      //jp.nextToken();
      ObjectMapper mapper = new ObjectMapper();
      mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); //Solamente campos necesarios.
      ObjectReader reader = mapper.reader(SchemeEvent.class);
      SchemeEvent event = reader.readValue(jp);
       
        
       /*
        * Convertir el timestamp en un objeto del tipo Calendario, para 
        * obtener posteriormente AÑO,MES,DIA,HORA,MINUTO.
        */  
       Long timestampLong = new Long(event.timestamp);
       Calendar cal = Calendar.getInstance();
       cal.setTimeInMillis(timestampLong * 1000);
         
         /*
          * Creamos el Array de los datos necesarios para generar tanto el 
          * PATH, como el fichero de los eventos.
          */
       
         List<String> lista= new ArrayList<String>();
      
         lista.add(topic);
         lista.add(String.valueOf(cal.get(Calendar.YEAR)));
         lista.add(String.valueOf(cal.get(Calendar.MONTH)));
         lista.add(String.valueOf(cal.get(Calendar.DAY_OF_MONTH)));
         lista.add(String.valueOf(cal.get(Calendar.HOUR_OF_DAY)));
         lista.add(String.valueOf(cal.get(Calendar.MINUTE)));
         lista.add(event.sensor_name);
         lista.add(event.type);
         lista.add(event.sig_generator);
         lista.add(event.sig_id);
         lista.add(event.src_str);
         lista.add(event.dst_str);

        /* ImmutableList<String> direction = ImmutableList.of(
            topic,
            String.valueOf(cal.get(Calendar.YEAR)),
            String.valueOf(cal.get(Calendar.MONTH)),
            String.valueOf(cal.get(Calendar.DAY_OF_MONTH)),
            String.valueOf(cal.get(Calendar.HOUR_OF_DAY)),
            String.valueOf(cal.get(Calendar.MINUTE)),
            event.sensor_name,
            event.type,
            event.sig_generator,
            event.sig_id,
            event.src_str,
            event.dst_str
         );  */
         
         /*
          * Añadimos un '0' si la fecha es menor de 10 --| 9 -> 09 
          */
         
         if(cal.get(Calendar.DAY_OF_MONTH)<10)
            lista.set(DAY, "0"+lista.get(DAY));
         
         if(cal.get(Calendar.MINUTE)<10)
             lista.set(MINUTE, "0"+lista.get(MINUTE));
         if(cal.get(Calendar.HOUR_OF_DAY)<10)
             lista.set(HOUR, "0"+lista.get(HOUR));
         if(cal.get(Calendar.MONTH)<10)
             lista.set(MONTH, "0"+lista.get(MONTH));
                   
       PersistentVector direction = PersistentVector.create(lista);
         
        return direction; 
    }   
   
     private String choose (String tupleStr, String startString, String endString){
       
       StringBuilder event = new StringBuilder (tupleStr);
       
       int start=event.indexOf(startString);
       int end=event.indexOf(endString);       
       String result = event.substring(start, end+1);
       
       
       return result;
   }
    
}

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Reply all
Reply to author
Forward
0 new messages