import java.io.IOException;
import java.util.Calendar;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import clojure.lang.PersistentVector;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class GetDirection implements Serializable {
public static final int TOPIC = 0;
public static final int YEAR = 1;
public static final int MONTH = 2;
public static final int DAY = 3;
public static final int HOUR = 4;
public static final int MINUTE = 5;
public PersistentVector process (String tupleStr, String topic) throws IOException{
/*
* Mapeamos el evento en una clase usando Jackson (json)
*/
GetDirection getDir = new GetDirection();
String eventoStr = getDir.choose(tupleStr, "{", "}");
//System.out.println("\n" + "EVENTO:D:"+ eventoStr+ "\n");
JsonFactory f = new JsonFactory();
JsonParser jp = f.createJsonParser(eventoStr);
//jp.nextToken();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); //Solamente campos necesarios.
ObjectReader reader = mapper.reader(SchemeEvent.class);
SchemeEvent event = reader.readValue(jp);
/*
* Convertir el timestamp en un objeto del tipo Calendario, para
* obtener posteriormente AÑO,MES,DIA,HORA,MINUTO.
*/
Long timestampLong = new Long(event.timestamp);
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestampLong * 1000);
/*
* Creamos el Array de los datos necesarios para generar tanto el
* PATH, como el fichero de los eventos.
*/
List<String> lista= new ArrayList<String>();
lista.add(topic);
lista.add(String.valueOf(cal.get(Calendar.YEAR)));
lista.add(String.valueOf(cal.get(Calendar.MONTH)));
lista.add(String.valueOf(cal.get(Calendar.DAY_OF_MONTH)));
lista.add(String.valueOf(cal.get(Calendar.HOUR_OF_DAY)));
lista.add(String.valueOf(cal.get(Calendar.MINUTE)));
lista.add(event.sensor_name);
lista.add(event.type);
lista.add(event.sig_generator);
lista.add(event.sig_id);
lista.add(event.src_str);
lista.add(event.dst_str);
/*
* Añadimos un '0' si la fecha es menor de 10 --| 9 -> 09
*/
if(cal.get(Calendar.DAY_OF_MONTH)<10)
lista.set(DAY, "0"+lista.get(DAY));
if(cal.get(Calendar.MINUTE)<10)
lista.set(MINUTE, "0"+lista.get(MINUTE));
if(cal.get(Calendar.HOUR_OF_DAY)<10)
lista.set(HOUR, "0"+lista.get(HOUR));
if(cal.get(Calendar.MONTH)<10)
lista.set(MONTH, "0"+lista.get(MONTH));
PersistentVector direction = PersistentVector.create(lista);
return direction;
}
private String choose (String tupleStr, String startString, String endString){
StringBuilder event = new StringBuilder (tupleStr);
int start=event.indexOf(startString);
int end=event.indexOf(endString);
String result = event.substring(start, end+1);
return result;
}
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.storm.redbordertopology.bolts;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import clojure.lang.PersistentVector;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
*
* @author root
*/
public class CheckDirBolt extends BaseRichBolt {
OutputCollector _collector;
public static final int TOPIC = 0;
public static final int YEAR = 1;
public static final int MONTH = 2;
public static final int DAY = 3;
public static final int HOUR = 4;
public static final int MINUTE = 5;
public static final int SENSOR_NAME = 6;
public static final int TYPE = 7;
public static final int SIG_GENERATOR = 8;
public static final int SIG_ID = 9;
public static final int SRC_STR = 10;
public static final int DST_STR = 11;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
PersistentVector tupleDirection = (PersistentVector) tuple.getValueByField("direction");
String evento = tuple.getValueByField("events").toString().substring(tuple.getValueByField("events").toString().indexOf("[") + 1, tuple.getValueByField("events").toString().indexOf("]"));
boolean isExistsDir = false;
boolean isExistsFile = false;
FileSystem hdfs = null;
FSDataInputStream in = null;
FSDataOutputStream out = null;
FSDataInputStream backup = null;
/*
* Creamos el Path correspondiente a cada evento, y path+nombre del fichero que
* almacena los eventos.
*/
Path pathDir = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
+ tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
+ "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
+ "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
+ "/" + tupleDirection.get(SRC_STR)
+ "/" + tupleDirection.get(DST_STR));
Path pathFile = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
+ tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
+ "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
+ "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
+ "/" + tupleDirection.get(SRC_STR)
+ "/" + tupleDirection.get(DST_STR)
+ "/" + tupleDirection.get(SENSOR_NAME) + "."
+ tupleDirection.get(TYPE) + "." + "json");
Path pathFileBackup = new Path("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
+ tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
+ "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
+ "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
+ "/" + tupleDirection.get(SRC_STR)
+ "/" + tupleDirection.get(DST_STR)
+ "/" + tupleDirection.get(SENSOR_NAME) + "."
+ tupleDirection.get(TYPE) + "." + "tmp");
/*
* Configuracion del sistema de archivos de hadoop HDFS.
*/
Configuration config = new Configuration();
config.addResource(new Path("/opt/rb/etc/hadoop/core-site.xml"));
config.addResource(new Path("/opt/rb/etc/hadoop/hdfs-site.xml"));
config.addResource(new Path("/opt/rb/etc/hadoop/mapred-site.xml"));
/*
* Configuramos el FileSystem con la configuracion de nuestro
* sistema hadoop.
*/
try {
hdfs = FileSystem.get(config);
} catch (IOException ex) {
System.out.println("EXCEPCION CREAR CONF: " + ex.toString());
//System.exit(1);
}
/*
* Comprobamos si existe el directorio correspondiente al evento.
* NO EXISTE:
* - Creamos el directorio correspondiente. Creamos el fichero
* json correspondiente y guardamos el evento en su interior.
* EXISTE:
* - Comprobamos si existe el fichero .json de eventos en su
* interior. EXISTE: añadimos el evento.
* NO EXISTE: creamos el fichero y añadimos envento.
*/
/*
* Comprobamos si existe el directorio.
*/
try {
isExistsDir = hdfs.exists(pathDir);
} catch (IOException ex) {
System.out.println("EXCEPCION EXISTE: " + ex.toString());
}
if (!isExistsDir) {
/*
* Creamos el directorio.
*/
try {
hdfs.mkdirs(pathDir);
} catch (IOException ex) {
System.out.println("EXCEPCION CREANDO DIRECTORIO: " + ex.toString());
}
/*
* Creamos el fichero y guardamos el fichero correspondiente.
*/
try {
FSDataOutputStream file = hdfs.create(pathFile);
file.writeBytes(evento + "\n");
file.close();
} catch (IOException ex) {
System.out.println("CREAR FICHERO - Despues mkdir: " + ex.toString());
}
} else {
try {
/*
* Comprobamos si existe algun fichero de eventos.
*/
isExistsFile = hdfs.exists(pathFile);
} catch (IOException ex) {
System.out.println("ERROR FICHERO NO EXIS: " + ex.toString());
}
if (isExistsFile) {
/*
* Add el nuevo evento, al fichero.
*/
/*String file = "";
try {
FSDataInputStream in = hdfs.open(pathFile);
file = in.readUTF();
in.close();
} catch (IOException ex) {
System.out.println("ERROR LEER FICHERO: " + ex.toString());
}
*/
try {
in = hdfs.open(pathFile);
} catch (IOException ex) {
System.out.println("ERROR ABRIR FICHERO PARA BACKUP: " + ex.toString());
}
try {
backup(config, hdfs, in, pathFileBackup);
in.close();
} catch (IOException ex) {
System.out.println("ERROR AL REALIZAR BACKUP: " + ex.toString());
}
try {
out = hdfs.create((pathFile), true);
backup = hdfs.open(pathFileBackup);
int offset = 0;
int bufferSize = 4096;
int result = 0;
byte[] buffer = new byte[bufferSize];
// pre read a part of content from input stream
result = backup.read(offset, buffer, 0, bufferSize);
// loop read input stream until it does not fill whole size of buffer
while (result == bufferSize) {
out.write(buffer);
// read next segment from input stream by moving the offset pointer
offset += bufferSize;
result = backup.read(offset, buffer, 0, bufferSize);
}
if (result > 0 && result < bufferSize) {
for (int i = 0; i < result; i++) {
out.write(buffer[i]);
}
}
out.writeBytes(evento + "\n");
out.close();
} catch (IOException ex) {
System.out.println("ERROR AL ADD EVENTO AL FICHERO:" + ex.toString());
}
try {
hdfs.delete(pathFileBackup, true);
} catch (IOException ex) {
System.out.println("ERROR AL BORRAR BACKUP:" + ex.toString());
}
System.out.println("/data" + "/" + "group" + "/" + tupleDirection.get(TOPIC) + "/"
+ tupleDirection.get(YEAR) + "/" + tupleDirection.get(MONTH)
+ "/" + tupleDirection.get(DAY) + "/" + tupleDirection.get(HOUR) + "/" + tupleDirection.get(MINUTE)
+ "/" + tupleDirection.get(SIG_GENERATOR) + "/" + tupleDirection.get(SIG_ID)
+ "/" + tupleDirection.get(SRC_STR)
+ "/" + tupleDirection.get(DST_STR)
+ "/" + tupleDirection.get(SENSOR_NAME) + "."
+ tupleDirection.get(TYPE) + "." + "json");
} else {
/*
* Creamos el fichero si no existe y add el evento.
*
try {
FSDataOutputStream out = hdfs.create(pathFile);
out.writeChars(evento+"\n");
out.close();
} catch (IOException ex) {
System.out.println("CREAR FICHERO - no existia: " + ex.toString());
} */
}
}
try {
/*
* Cerramos el sistema de ficheros de HDFS.
*/
hdfs.close();
} catch (IOException ex) {
System.out.println("NO SE PUEDE CERRAR: " + ex.toString());
}
/*
* Asentimos la tupla al bolt anterior.
*/
_collector.ack(tuple);
}
public static void backup(Configuration conf, FileSystem fs,
FSDataInputStream sourceContent, Path pathFileBackup) throws IOException {
FSDataOutputStream out = fs.create(pathFileBackup, true);
IOUtils.copyBytes(sourceContent, out, 4096, false);
out.close();
}
}
package com.storm.redbordertopology;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import com.storm.redbordertopology.bolts.CheckDirBolt;
import com.storm.redbordertopology.bolts.GetDataEventBolt;
import java.io.File;
import java.io.FileNotFoundException;
import org.ho.yaml.Yaml;
import storm.kafka.*;
public class RedborderTopology {
public static void main(String[] args) throws InterruptedException, FileNotFoundException, AlreadyAliveException, InvalidTopologyException {
//CARGAR EL FICHERO YAML
/* try{
Object kafka_settings = Yaml.load(new File("/opt/rb/var/www/rb-rails/config/rbdruid_config.yml"));
}catch(FileNotFoundException e){
System.out.println("El fichero no se ha encontrado \n");
}*/
//Definicion de la topologia.
TopologyBuilder builder = new TopologyBuilder();
//Configuracion del spout de kafka.
SpoutConfig spoutConfig = new SpoutConfig(
new KafkaConfig.ZkHosts("dev-andres:2181", "/brokers"), // list of Kafka brokers
"rb_event", // topic to read from
"/kafkastorm/event", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceStartOffsetTime(-2);
//Estructura de la topologia.
builder.setSpout("spoutKafka", new KafkaSpout(spoutConfig), 3);
builder.setBolt("boltGetData", new GetDataEventBolt("rb_event"), 4).shuffleGrouping("spoutKafka");
builder.setBolt("boltCheckDir", new CheckDirBolt(), 4).shuffleGrouping("boltGetData");
//Creacion y configuracion del cluster local.
/*Config conf = new Config();
conf.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Redborder-Topology", conf, builder.createTopology());
Utils.sleep(150000);
cluster.killTopology("Redborder-Topology");
cluster.shutdown();*/
//Creacion y configuracion cluster storm.
Config conf = new Config();
conf.setNumWorkers(20);
conf.registerSerialization(com.storm.redbordertopology.auxiliar.GetDirection.class);
conf.registerSerialization(com.storm.redbordertopology.auxiliar.SchemeEvent.class);
conf.registerSerialization(com.storm.redbordertopology.auxiliar.LibraryEvents.class);
//conf.setMaxTaskParallelism(1);
conf.setMaxSpoutPending(10000);
StormSubmitter.submitTopology("Redborder-Topology", conf, builder.createTopology());
}
}
Thanks!! Andres.