String[] columnNames = { "event_id", "event_time", "vehicle_id", "seg",
"speed" };
String[] columnTypes = { "bigint not null", "timestamp", "char(12)",
"int", "decimal" };
RowSpec rowSpec = new RowSpec(columnNames, columnTypes, 0, 1);
IndexSpec indexSpec = this.m_StreamCruncher.createIndexSpec(null,
"test_idx", "test", true, "event_id",
true);
TableSpec tableSpec = this.m_StreamCruncher.createTableSpec(null,
"test", rowSpec,
new IndexSpec[] { indexSpec }, new MiscSpec[] {});
this.m_StreamCruncher.registerInStream(tableSpec, true);
//sliding window - last 10
String rql = "select event_id, vehicle_id, speed, test_value from test
(" +
"partition by store last 10 seconds " +
"with custom(test_custom_function, speed) as test_value) as testStr;";
//la result columns - add the custom
columnNames = new String[] {"event_id", "vehicle_id", "speed",
"test_value" };
columnTypes = new String[] {"bigint", "char(12)", "decimal",
"varchar(20)" };
TableFQN tableFQN = new TableFQN("test_res");
String queryName = "test_res_rql";
ParserParameters parameters = new ParserParameters();
parameters.setQuery(rql);
parameters.setQueryName (queryName);
parameters.setResultColumnNames(columnNames);
parameters.setResultColumnTypes(columnTypes);
parameters.setResultTableFQN(tableFQN);
//register the custom aggregate function
//OBS: the custom functions must be registered before any query that
uses them is parsed
this.m_StreamCruncher.registerAggregator(new MyCustomFunctionHelper());
ParsedQuery parsedQuery = this.m_StreamCruncher.parseQuery(parameters);
QueryConfig config = parsedQuery.getQueryConfig();
config.setQuerySchedulePolicy(new QueryConfig.QuerySchedulePolicyValue(
QuerySchedulePolicy.ATLEAST_OR_SOONER, Integer.MAX_VALUE));
//register the running query
this.m_StreamCruncher.registerQuery (parsedQuery);
Here are the Aggregator classes. All I want to do is to add value "1"
at the speed and then return it in a new column....
What is the overriden init() method used for (in the example you have
on the site)? How can I use it in my example?
I didn't know how to add 1 to the speed value so I used the
AddedValues..
public class MyCustomFunction extends AbstractAggregator
{
private static final long serialVersionUID = 1L;
private long retValue;
@Override
public String aggregate(List<Object[]> removedValues, List<Object[]>
addedValues)
{
if (addedValues != null)
{
for (Object[] objects : addedValues)
{
Object object = objects[0];
// Consider only non-nulls.
if (object != null && object instanceof Long)
{
Long l = (Long) object;
retValue = l.longValue() + 1;
}
}
}
return retValue+"";
}
}
Oviously I get an error - when registering the running query - error
from the MySQL parser -
An error occurred while executing the Query: test_res_rql
com.mysql.jdbc.exceptions.MySQLSyntaxErrorException: Unknown column
'event_id' in 'field list'
What do I do wrong? :((((( please help me with some dummy examples..
Thanks for your time
--
Amalia Pirvanescu
In the "StreamCruncher - Basics", page you'll see this:
"For an Input Event defined as {a, b, c, d, e}, each of the constructs
will produce results with different structures:
1) Simple Window - (store last 5). Structure will be {a, b, c, d, e}
2) Anonymous Partition - (partition by store last 10). Structure will
be {a, b, c, d, e}
3) Partition - (partition by c, d, e store last 10). Structure will
still be {a, b, c, d, e}, but Windows will be maintained at "c -> d->
e" level.
4) Aggregated Partition - (partition by c, d store last 8 with avg(a)
as avg_a). Structure will still be {c, d, avg_a}, and Windows
maintained at "c -> d" level. "
In your case, it's an Aggregate defined in an Anonymous Partition
("partition by store last 10 seconds with custom(test_custom_function,
speed) as test_value"). So, the result will only contain one column
"test_value". You would've lost all other columns because the Aggregate
column works somewhat like the SQL Group By clause. The individual
values are lost, if you don't partition them explicitly. Consequently,
you get an error because in your Select clause you are trying to
retrieve all the columns - even the ones that do not get stored because
of the Aggregate + Anonymous Partition - "select event_id, vehicle_id,
speed, test_value from test". Here, event_id, vehicle_id, speed are
lost and that's why you see those errors.
Just like in Group By, here you can't get both the aggregated value and
the individual row values at the same time.
And, in the Aggregate function you have implemented -
"if (object != null && object instanceof Long)
{
Long l = (Long) object;
..
retValue = l.longValue() + 1;
.. ..
return retValue+"";
"
Your function must return the correct return type - here Long. In your
"else" block you are returning a String and in the "if" block you are
returning a long value?? Java is a strongly typed language.