Aggregate function (Posted on behalf of AmaliaP)

2 views
Skip to first unread message

Ashwin

unread,
Jan 16, 2007, 7:48:26 AM1/16/07
to streamcruncher
Hi
I registered in the sc group on gmail, i get the message "your message
will be post momentarily" and that's it..no post appears...
Here some new dificulties I encountered. I try to use custom aggregate
function and is not clear to me what the addedRows and removedRows
arguments from the aggregate method (in the AbstractAggregator) do.
Can you please help me with a little example?

String[] columnNames = { "event_id", "event_time", "vehicle_id", "seg",
"speed" };
String[] columnTypes = { "bigint not null", "timestamp", "char(12)",
"int", "decimal" };
RowSpec rowSpec = new RowSpec(columnNames, columnTypes, 0, 1);

IndexSpec indexSpec = this.m_StreamCruncher.createIndexSpec(null,
"test_idx", "test", true, "event_id",
true);

TableSpec tableSpec = this.m_StreamCruncher.createTableSpec(null,
"test", rowSpec,
new IndexSpec[] { indexSpec }, new MiscSpec[] {});
this.m_StreamCruncher.registerInStream(tableSpec, true);

//sliding window - last 10
String rql = "select event_id, vehicle_id, speed, test_value from test
(" +
"partition by store last 10 seconds " +
"with custom(test_custom_function, speed) as test_value) as testStr;";

//la result columns - add the custom
columnNames = new String[] {"event_id", "vehicle_id", "speed",
"test_value" };
columnTypes = new String[] {"bigint", "char(12)", "decimal",
"varchar(20)" };

TableFQN tableFQN = new TableFQN("test_res");
String queryName = "test_res_rql";

ParserParameters parameters = new ParserParameters();
parameters.setQuery(rql);
parameters.setQueryName (queryName);
parameters.setResultColumnNames(columnNames);
parameters.setResultColumnTypes(columnTypes);
parameters.setResultTableFQN(tableFQN);

//register the custom aggregate function
//OBS: the custom functions must be registered before any query that
uses them is parsed
this.m_StreamCruncher.registerAggregator(new MyCustomFunctionHelper());
ParsedQuery parsedQuery = this.m_StreamCruncher.parseQuery(parameters);

QueryConfig config = parsedQuery.getQueryConfig();
config.setQuerySchedulePolicy(new QueryConfig.QuerySchedulePolicyValue(
QuerySchedulePolicy.ATLEAST_OR_SOONER, Integer.MAX_VALUE));

//register the running query
this.m_StreamCruncher.registerQuery (parsedQuery);


Here are the Aggregator classes. All I want to do is to add value "1"
at the speed and then return it in a new column....
What is the overriden init() method used for (in the example you have
on the site)? How can I use it in my example?
I didn't know how to add 1 to the speed value so I used the
AddedValues..

public class MyCustomFunction extends AbstractAggregator
{
private static final long serialVersionUID = 1L;

private long retValue;

@Override
public String aggregate(List<Object[]> removedValues, List<Object[]>
addedValues)
{

if (addedValues != null)
{
for (Object[] objects : addedValues)
{
Object object = objects[0];

// Consider only non-nulls.
if (object != null && object instanceof Long)
{
Long l = (Long) object;

retValue = l.longValue() + 1;
}
}
}

return retValue+"";
}
}

Oviously I get an error - when registering the running query - error
from the MySQL parser -

An error occurred while executing the Query: test_res_rql
com.mysql.jdbc.exceptions.MySQLSyntaxErrorException: Unknown column
'event_id' in 'field list'

What do I do wrong? :((((( please help me with some dummy examples..

Thanks for your time

--
Amalia Pirvanescu

Ashwin

unread,
Jan 16, 2007, 8:08:35 AM1/16/07
to streamcruncher
Hello, to start with, you have to understand how Aggregates affect the
columns in the Partition result.

In the "StreamCruncher - Basics", page you'll see this:

"For an Input Event defined as {a, b, c, d, e}, each of the constructs
will produce results with different structures:

1) Simple Window - (store last 5). Structure will be {a, b, c, d, e}

2) Anonymous Partition - (partition by store last 10). Structure will
be {a, b, c, d, e}

3) Partition - (partition by c, d, e store last 10). Structure will
still be {a, b, c, d, e}, but Windows will be maintained at "c -> d->
e" level.

4) Aggregated Partition - (partition by c, d store last 8 with avg(a)
as avg_a). Structure will still be {c, d, avg_a}, and Windows
maintained at "c -> d" level. "

In your case, it's an Aggregate defined in an Anonymous Partition
("partition by store last 10 seconds with custom(test_custom_function,
speed) as test_value"). So, the result will only contain one column
"test_value". You would've lost all other columns because the Aggregate
column works somewhat like the SQL Group By clause. The individual
values are lost, if you don't partition them explicitly. Consequently,
you get an error because in your Select clause you are trying to
retrieve all the columns - even the ones that do not get stored because
of the Aggregate + Anonymous Partition - "select event_id, vehicle_id,
speed, test_value from test". Here, event_id, vehicle_id, speed are
lost and that's why you see those errors.

Just like in Group By, here you can't get both the aggregated value and
the individual row values at the same time.

And, in the Aggregate function you have implemented -


"if (object != null && object instanceof Long)
{
Long l = (Long) object;

..


retValue = l.longValue() + 1;

.. ..
return retValue+"";
"

Your function must return the correct return type - here Long. In your
"else" block you are returning a String and in the "if" block you are
returning a long value?? Java is a strongly typed language.

AmaliaP

unread,
Jan 18, 2007, 3:16:00 AM1/18/07
to streamcruncher
Thanks for explanations - I made that custom aggregate function to work
:)
Reply all
Reply to author
Forward
0 new messages