How do I serialize a class that uses a bolt? Exception: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl

237 views
Skip to first unread message

Andres Gomez Ferrer

unread,
Aug 19, 2013, 4:55:21 AM8/19/13
to storm...@googlegroups.com
How do I serialize a class that uses a bolt? I'm a newbie. And when i try run the topology. I see that:

java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
	at backtype.storm.disruptor$consume_loop_STAR_$fn__2961.invoke(disruptor.clj:74)
	at backtype.storm.util$async_loop$fn__441.invoke(util.clj:396)
	at clojure.lang.AFn.run(AFn.java:24)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
	at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
	at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__5666$fn__5670.invoke(worker.clj:108)
	at backtype.storm.util$fast_list_map.invoke(util.clj:792)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__5666.invoke(worker.clj:108)
	at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3314.invoke(executor.clj:239)
	at backtype.storm.disruptor$clojure_handler$reify__2948.onEvent(disruptor.clj:43)
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
	... 6 more
Caused by: java.io.NotSerializableException: backtype.storm.tuple.TupleImpl
	at java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.io.ObjectOutputStream.writeObject(Unknown Source)
	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
	... 18 more
---------------My topology--------------------------------
Spoutkafka - storm-contrib
bolt1:
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------

package com.storm.redbordertopology.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.storm.redbordertopology.auxiliar.GetDirection;
import java.io.IOException;
import java.util.Map;

import clojure.lang.PersistentVector;

/**
 *
 * @author root
 */
public class GetDataEventBolt extends BaseRichBolt {

     OutputCollector _collector;
     String _topic;
     
     public GetDataEventBolt(String topic){
         this._topic = topic;
     }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("direction","events"));
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        
                GetDirection dir = new GetDirection();
                PersistentVector direccion = null;
                
         try {
             direccion = dir.process(tuple.getString(0), _topic);
         } catch (IOException ex) {
             System.out.println("Excepcion JACKSON: " + ex.toString());
         }
                
                
                _collector.emit(new Values(direccion,tuple));
                _collector.ack(tuple);
    }
    
}


--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------

class GetDirection:
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------


package com.storm.redbordertopology.auxiliar;


import java.io.IOException;
import java.util.Calendar;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;

import clojure.lang.PersistentVector;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;




public class GetDirection implements Serializable  {
    
    public static final int TOPIC = 0;
    public static final int YEAR = 1;
    public static final int MONTH = 2;
    public static final int DAY = 3;
    public static final int HOUR = 4;
    public static final int MINUTE = 5;
    
    
   public PersistentVector process (String tupleStr, String topic) throws IOException{
        
       
         
         
      /*
       * Mapeamos el evento en una clase usando Jackson (json)
       */   
       GetDirection getDir = new GetDirection();
       
        String eventoStr = getDir.choose(tupleStr, "{", "}"); 
        
        //System.out.println("\n" + "EVENTO:D:"+ eventoStr+  "\n");
        
      JsonFactory f = new JsonFactory();
      JsonParser jp = f.createJsonParser(eventoStr);

      //jp.nextToken();
      ObjectMapper mapper = new ObjectMapper();
      mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); //Solamente campos necesarios.
      ObjectReader reader = mapper.reader(SchemeEvent.class);
      SchemeEvent event = reader.readValue(jp);
       
        
       /*
        * Convertir el timestamp en un objeto del tipo Calendario, para 
        * obtener posteriormente AÑO,MES,DIA,HORA,MINUTO.
        */  
       Long timestampLong = new Long(event.timestamp);
       Calendar cal = Calendar.getInstance();
       cal.setTimeInMillis(timestampLong * 1000);
         
         /*
          * Creamos el Array de los datos necesarios para generar tanto el 
          * PATH, como el fichero de los eventos.
          */
       
         List<String> lista= new ArrayList<String>();
      
         lista.add(topic);
         lista.add(String.valueOf(cal.get(Calendar.YEAR)));
         lista.add(String.valueOf(cal.get(Calendar.MONTH)));
         lista.add(String.valueOf(cal.get(Calendar.DAY_OF_MONTH)));
         lista.add(String.valueOf(cal.get(Calendar.HOUR_OF_DAY)));
         lista.add(String.valueOf(cal.get(Calendar.MINUTE)));
         lista.add(event.sensor_name);
         lista.add(event.type);
         lista.add(event.sig_generator);
         lista.add(event.sig_id);
         lista.add(event.src_str);
         lista.add(event.dst_str);
         
         /*
          * Añadimos un '0' si la fecha es menor de 10 --| 9 -> 09 
          */
         
         if(cal.get(Calendar.DAY_OF_MONTH)<10)
            lista.set(DAY, "0"+lista.get(DAY));
         
         if(cal.get(Calendar.MINUTE)<10)
             lista.set(MINUTE, "0"+lista.get(MINUTE));
         if(cal.get(Calendar.HOUR_OF_DAY)<10)
             lista.set(HOUR, "0"+lista.get(HOUR));
         if(cal.get(Calendar.MONTH)<10)
             lista.set(MONTH, "0"+lista.get(MONTH));
                   
       PersistentVector direction = PersistentVector.create(lista);
         
        return direction; 
    }   
   
     private String choose (String tupleStr, String startString, String endString){
       
       StringBuilder event = new StringBuilder (tupleStr);
       
       int start=event.indexOf(startString);
       int end=event.indexOf(endString);       
       String result = event.substring(start, end+1);
       
       
       return result;
   }
    
}
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------


bolt2:
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package com.storm.redbordertopology.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import clojure.lang.PersistentVector;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 *
 * @author root
 */
public class CheckDirBolt extends BaseRichBolt {

    OutputCollector _collector;
    public static final int TOPIC = 0;
    public static final int YEAR = 1;
    public static final int MONTH = 2;
    public static final int DAY = 3;
    public static final int HOUR = 4;
    public static final int MINUTE = 5;
    public static final int SENSOR_NAME = 6;
    public static final int TYPE = 7;
    public static final int SIG_GENERATOR = 8;
    public static final int SIG_ID = 9;
    public static final int SRC_STR = 10;
    public static final int DST_STR = 11;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {



        PersistentVector tupleDirection = (PersistentVector) tuple.getValueByField("direction");
        String evento = tuple.getValueByField("events").toString().substring(tuple.getValueByField("events").toString().indexOf("[") + 1, tuple.getValueByField("events").toString().indexOf("]"));
        boolean isExistsDir = false;
        boolean isExistsFile = false;
        FileSystem hdfs = null;
        FSDataInputStream in = null;
        FSDataOutputStream out = null;
        FSDataInputStream backup = null;

        /*
         * Creamos el Path correspondiente a cada evento, y path+nombre del fichero que
         * almacena los eventos.
         */
        Path pathDir = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
                + tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
                + "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
                + "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
                + "/" + tupleDirection.get(SRC_STR)
                + "/" + tupleDirection.get(DST_STR));

        Path pathFile = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
                + tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
                + "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
                + "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
                + "/" + tupleDirection.get(SRC_STR)
                + "/" + tupleDirection.get(DST_STR)
                + "/" + tupleDirection.get(SENSOR_NAME) + "."
                + tupleDirection.get(TYPE) + "." + "json");

        Path pathFileBackup = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
                + tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
                + "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
                + "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
                + "/" + tupleDirection.get(SRC_STR)
                + "/" + tupleDirection.get(DST_STR)
                + "/" + tupleDirection.get(SENSOR_NAME) + "."
                + tupleDirection.get(TYPE) + "." + "tmp");


        /*
         * Configuracion del sistema de archivos de hadoop HDFS.
         */
        Configuration config = new Configuration();

        config.addResource(new Path("/opt/rb/etc/hadoop/core-site.xml"));
        config.addResource(new Path("/opt/rb/etc/hadoop/hdfs-site.xml"));
        config.addResource(new Path("/opt/rb/etc/hadoop/mapred-site.xml"));




        /*
         * Configuramos el FileSystem con la configuracion de nuestro
         * sistema hadoop.
         */

        try {
            hdfs = FileSystem.get(config);
        } catch (IOException ex) {
            System.out.println("EXCEPCION CREAR CONF: " + ex.toString());
            //System.exit(1);
        }


        /*
         * Comprobamos si existe el directorio correspondiente al evento.
         * NO EXISTE: 
         *         - Creamos el directorio correspondiente. Creamos el fichero
         *          json correspondiente y guardamos el evento en su interior.
         * EXISTE:
         *         - Comprobamos si existe el fichero .json de eventos en su
         *          interior. EXISTE: añadimos el evento.
         *                    NO EXISTE: creamos el fichero y añadimos envento.
         */

        /*
         * Comprobamos si existe el directorio.
         */

        try {
            isExistsDir = hdfs.exists(pathDir);
        } catch (IOException ex) {
            System.out.println("EXCEPCION EXISTE: " + ex.toString());
        }

        if (!isExistsDir) {

            /*
             * Creamos el directorio.
             */
            try {
                hdfs.mkdirs(pathDir);
            } catch (IOException ex) {
                System.out.println("EXCEPCION CREANDO DIRECTORIO: " + ex.toString());
            }

            /*
             * Creamos el fichero y guardamos el fichero correspondiente.
             */
            try {
                FSDataOutputStream file = hdfs.create(pathFile);
                file.writeBytes(evento + "\n");
                file.close();
            } catch (IOException ex) {
                System.out.println("CREAR FICHERO - Despues mkdir: " + ex.toString());
            }

        } else {
            try {

                /*
                 * Comprobamos si existe algun fichero de eventos.
                 */
                isExistsFile = hdfs.exists(pathFile);
            } catch (IOException ex) {
                System.out.println("ERROR FICHERO NO EXIS: " + ex.toString());
            }

            if (isExistsFile) {

                /*
                 * Add el nuevo evento, al fichero.
                 */


                /*String file = "";
                 try {
                 FSDataInputStream in = hdfs.open(pathFile);
                 file = in.readUTF();
                 in.close();

                 } catch (IOException ex) {
                 System.out.println("ERROR LEER FICHERO: " + ex.toString()); 
                 }
                    
                 */




                try {
                    in = hdfs.open(pathFile);
                } catch (IOException ex) {
                    System.out.println("ERROR ABRIR FICHERO PARA BACKUP: " + ex.toString());
                }
                try {
                    backup(config, hdfs, in, pathFileBackup);
                    in.close();
                } catch (IOException ex) {
                    System.out.println("ERROR AL REALIZAR BACKUP: " + ex.toString());
                }




                try {
                    out = hdfs.create((pathFile), true);
                    backup = hdfs.open(pathFileBackup);



                    int offset = 0;
                    int bufferSize = 4096;

                    int result = 0;

                    byte[] buffer = new byte[bufferSize];
                    // pre read a part of content from input stream
                    result = backup.read(offset, buffer, 0, bufferSize);
                    // loop read input stream until it does not fill whole size of buffer
                    while (result == bufferSize) {
                        out.write(buffer);
                        // read next segment from input stream by moving the offset pointer
                        offset += bufferSize;
                        result = backup.read(offset, buffer, 0, bufferSize);
                    }

                    if (result > 0 && result < bufferSize) {
                        for (int i = 0; i < result; i++) {
                            out.write(buffer[i]);
                        }

                    }
                    out.writeBytes(evento + "\n");
                    out.close();
                } catch (IOException ex) {
                    System.out.println("ERROR AL ADD EVENTO AL FICHERO:" + ex.toString());
                }
                try {
                    hdfs.delete(pathFileBackup, true);
                } catch (IOException ex) {
                    System.out.println("ERROR AL BORRAR BACKUP:" + ex.toString());
                }


                System.out.println("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
                        + tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
                        + "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
                        + "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
                        + "/" + tupleDirection.get(SRC_STR)
                        + "/" + tupleDirection.get(DST_STR)
                        + "/" + tupleDirection.get(SENSOR_NAME) + "."
                        + tupleDirection.get(TYPE) + "." + "json");

            } else {
                /*
                 * Creamos el fichero si no existe y add el evento.
                 *
                 try {
                 FSDataOutputStream out = hdfs.create(pathFile);
                 out.writeChars(evento+"\n");
                 out.close();
                 } catch (IOException ex) {
                 System.out.println("CREAR FICHERO - no existia: " + ex.toString()); 
                 } */
            }
        }


        try {
            /*
             * Cerramos el sistema de ficheros de HDFS.
             */
            hdfs.close();
        } catch (IOException ex) {
            System.out.println("NO SE PUEDE CERRAR: " + ex.toString());
        }
        /*
         * Asentimos la tupla al bolt anterior.
         */
        _collector.ack(tuple);
    }

    public static void backup(Configuration conf, FileSystem fs,
            FSDataInputStream sourceContent, Path pathFileBackup) throws IOException {

        FSDataOutputStream out = fs.create(pathFileBackup, true);
        IOUtils.copyBytes(sourceContent, out, 4096, false);
        out.close();

    }
}

--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
main
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
package com.storm.redbordertopology;

import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import com.storm.redbordertopology.bolts.CheckDirBolt;
import com.storm.redbordertopology.bolts.GetDataEventBolt;
import java.io.File;
import java.io.FileNotFoundException;
import org.ho.yaml.Yaml;
import storm.kafka.*;

public class RedborderTopology {

    public static void main(String[] args) throws InterruptedException, FileNotFoundException, AlreadyAliveException, InvalidTopologyException {

        //CARGAR EL FICHERO YAML
        /*   try{
         Object kafka_settings = Yaml.load(new File("/opt/rb/var/www/rb-rails/config/rbdruid_config.yml")); 
         }catch(FileNotFoundException e){
         System.out.println("El fichero no se ha encontrado \n");
         }*/

        //Definicion de la topologia.

        TopologyBuilder builder = new TopologyBuilder();


        //Configuracion del spout de kafka.
        SpoutConfig spoutConfig = new SpoutConfig(
                new KafkaConfig.ZkHosts("dev-andres:2181", "/brokers"), // list of Kafka brokers
                "rb_event", // topic to read from
                "/kafkastorm/event", // the root path in Zookeeper for the spout to store the consumer offsets
                "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.forceStartOffsetTime(-2);

        //Estructura de la topologia.
        builder.setSpout("spoutKafka", new KafkaSpout(spoutConfig), 3);
        builder.setBolt("boltGetData", new GetDataEventBolt("rb_event"), 4).shuffleGrouping("spoutKafka");
        builder.setBolt("boltCheckDir", new CheckDirBolt(), 4).shuffleGrouping("boltGetData");

        //Creacion y configuracion del cluster local.
        /*Config conf = new Config();
        conf.setMaxTaskParallelism(1);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Redborder-Topology", conf, builder.createTopology());

        Utils.sleep(150000);
        cluster.killTopology("Redborder-Topology");
        cluster.shutdown();*/
       
        //Creacion y configuracion cluster storm.
         Config conf = new Config();
         conf.setNumWorkers(20);
         conf.registerSerialization(com.storm.redbordertopology.auxiliar.GetDirection.class);
         conf.registerSerialization(com.storm.redbordertopology.auxiliar.SchemeEvent.class);
         conf.registerSerialization(com.storm.redbordertopology.auxiliar.LibraryEvents.class);
         
          
         //conf.setMaxTaskParallelism(1);
         conf.setMaxSpoutPending(10000);
         StormSubmitter.submitTopology("Redborder-Topology", conf, builder.createTopology());
    }
}


please need help!! 

Thanks!! Andres.

Sumant Sankaran

unread,
Aug 19, 2013, 5:05:03 AM8/19/13
to storm...@googlegroups.com
Is this serialized ? PersistentVector ?

You seem to be emitting a tuple that has an object of type PersistentVector .




--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Andres Gomez Ferrer

unread,
Aug 19, 2013, 5:14:48 AM8/19/13
to storm...@googlegroups.com
I want to send an array of string. What kind do you recommend to be serializable?

JF Picard

unread,
Sep 6, 2013, 8:19:49 AM9/6/13
to storm...@googlegroups.com
You can send the array as a String separated by a delimiter and split it when you process it:

'String1;String2;String3'  

When you want the array you just use myString.split(";");
Reply all
Reply to author
Forward
0 new messages