import java.util.HashMap;
import java.util.Map;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.Callback;
import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException;
/**
*
* TEST SIDDHI CEP <br>
* <br>
* Please check the getMap() method, I would like to obtain and HashMap<String,Object>.<br>
* map.get("avgPrice") instead of HashMap<Integer,Object> ... <br>
* <br>
* <b>History:</b><br>
* - [18/mar/2013] Created. (Alberto Sfolcini)<br>
*
*/
public class TestSIDDHI {
InputHandler inputHandler;
public TestSIDDHI(){
System.out.println("TESTING SIDDHI CEP");
try {
init();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SiddhiPraserException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Return a map for the relative query
* @param obj
* @return
*/
private Map getMap(Object obj){
Map<Integer, Object> map = new HashMap<Integer, Object>();
Object[] data = (Object[]) obj;
for(int j=0;j<data.length;j++){
map.put(j+1, data[j]);
}
return map;
}
public void init() throws InterruptedException, SiddhiPraserException {
// Create Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.defineStream("define stream tickStream ( symbol string, price double, volume int)");
String query = "from tickStream [win.time(1000)] " +
"insert into tick symbol, avg(price) as avgPrice"+
"group by symbol";
siddhiManager.addQuery(query);
System.out.println(query);
siddhiManager.addCallback("tick", new Callback() {
public void receive(long timeStamp, Object[] newEventData, Object[] removeEventData,
Object[] faultEventData) {
//System.out.println(toString(timeStamp, newEventData, removeEventData, faultEventData));
if (newEventData!=null)
for(int i=0;i<newEventData.length;i++){
Map map = getMap(newEventData[i]);
System.out.println("MAP: "+map.toString());
}
}
});
inputHandler= siddhiManager.getInputHandler("tickStream");
}
public void start(){
try {
for(int i=0;i<10;i++){
double d = 600 + Math.random()*10;
Thread.sleep(500);
inputHandler.send(new Object[]{"GOOG", d , 900123});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
new TestSIDDHI().start();
System.out.println("Done.");
System.exit(0);
}
}