we have a table with
100 M records ...we are finding difference in execution time ..Are we doing something wrong?
The question is why the last approach is so much faster. We expected a nested filter + aggregate to be just as or faster than doing a filter first and then an aggregate?.
The following code
takes 700 milli seconds to execute, here we create an operation with filter and
aggregation
scoped_ptr<Operation> named_columns(Project(ProjectRename(util::gtl::Container("F1",
"F2"),
ProjectAllAttributes()), table));
scoped_ptr<Operation> filter1(Filter(Less(AttributeAt(0),
ConstInt32(200000)), ProjectAllAttributes(), named_columns.release()));
scoped_ptr<Operation> filter2(Filter(Greater(AttributeAt(0),
ConstInt32(0)), ProjectAllAttributes(), filter1.release()));
OPeration* op = AggregateClusters(new CompoundSingleSourceProjector(),
(new AggregationSpecification)->AddAggregation(MAX,
"F1", "f1_max"),
filter2.release());
/* Iterate through cursor to copy the result records to a block */
FailureOrOwned<Cursor> cur = op->CreateCursor();
if(cur.is_success()){
ResultView rv = cur->Next(-1);
*result_space = new Block(cur->schema(), allocator);
ViewCopier copier(cur->schema(), true);
offset = 0;
while (!rv.is_done()) {
const View& view = rv.view();
rowcount_t view_row_count = view.row_count();
(*result_space)->Reallocate(offset + view_row_count);
rowcount_t rows_copied = copier.Copy(view_row_count, view,
offset, (*result_space));
offset += rows_copied;
rv = cur->Next(-1);
}
}
The below code takes
500 ms to execute .. here we are just doing an aggregation
OPeration* op = AggregateClusters(new
CompoundSingleSourceProjector(),
(new AggregationSpecification)->AddAggregation(MAX,
"F1", "f1_max"),
table);
FailureOrOwned<Cursor> cur = op->CreateCursor();
if(cur.is_success()){
ResultView rv = cur->Next(-1);
*result_space = new Block(cur->schema(), allocator);
ViewCopier copier(cur->schema(), true);
offset = 0;
while (!rv.is_done()) {
const View& view = rv.view();
rowcount_t view_row_count = view.row_count();
(*result_space)->Reallocate(offset + view_row_count);
rowcount_t rows_copied = copier.Copy(view_row_count, view,
offset, (*result_space));
offset += rows_copied;
rv = cur->Next(-1);
}
}
The below code takes
200 ms to execute .. here we are doing filter and materializing the result to a
block and passing the block to create aggregate operation
scoped_ptr<Operation> filter1(Filter(Less(AttributeAt(0),
ConstInt32(200000)), ProjectAllAttributes(), table));
OPeration* op= filter2(Filter(Greater(AttributeAt(0), ConstInt32(0)),
ProjectAllAttributes(), filter1.release()));
FailureOrOwned<Cursor> cur = op->CreateCursor();
if(cur.is_success()){
ResultView rv = cur->Next(-1);
*result_space = new Block(cur->schema(), allocator);
ViewCopier copier(cur->schema(), true);
offset = 0;
while (!rv.is_done()) {
const View& view = rv.view();
rowcount_t view_row_count = view.row_count();
(*result_space)->Reallocate(offset + view_row_count);
rowcount_t rows_copied = copier.Copy(view_row_count, view,
offset, (*result_space));
offset += rows_copied;
rv = cur->Next(-1);
}
}
/*
pass the block just created from filter operation to aggregate operation */
OPeration*
op1 = AggregateClusters(new CompoundSingleSourceProjector(),
(new AggregationSpecification)->AddAggregation(MAX,
"F1", "f1_max"),
new Table(*result_space));
FailureOrOwned<Cursor>
cur1 = op1->CreateCursor();
if(cu1r.is_success()){
ResultView rv = cu1r->Next(-1);
final_result_space = new Block(cur->schema(), allocator);
ViewCopier copier(cur1->schema(), true);
offset = 0;
while (!rv.is_done()) {
const View& view = rv.view();
rowcount_t view_row_count = view.row_count();
final_result_space ->Reallocate(offset + view_row_count);
rowcount_t rows_copied = copier.Copy(view_row_count, view,
offset,final_result_space );
offset += rows_copied;
rv = cur1->Next(-1);
}
}