(I see Ariel answered this while I was typing ... so apologies to
Ariel for re-answering. Perhaps two comments are better than one?)
Given an arbitrary number of rows, but only 10 unique values, the most
memory the distinct should require is 10 * (size of projected tuple).
The problem is that the distinct is on top of the receive. This plan:
RETURN RESULTS TO STORED PROCEDURE
DISTINCT
RECEIVE FROM ALL PARTITIONS
SEND PARTITION RESULTS TO COORDINATOR
SEQUENTIAL SCAN of "LOG"
Should really look like this:
RETURN RESULTS TO STORED PROCEDURE
DISTINCT
RECEIVE FROM ALL PARTITIONS
SEND PARTITION RESULTS TO COORDINATOR
DISTINCT
SEQUENTIAL SCAN of "LOG"
And the distinct should be inlined with the scan. We do this push down
for other aggregates but a simple distinct got missed...
Ryan.
If the temp table limit is hit, it means the table is being copied as
a result of the scan; that copy isn't necessary. The EE glues
executors together Source -> output to temp table -> next executor
... As an optimization, if the source doesn't alter tuples, it can
eliminate the temp table and create the chain source -> next executor.
See this code, for example, in seqscanexecutor.cpp:
//
// OPTIMIZATION: If there is no predicate for this SeqScan,
// then we want to just set our OutputTable pointer to be the
// pointer of our TargetTable. This prevents us from just
// reading through the entire TargetTable and copying all of
// the tuples. We are guarenteed that no Executor will ever
// modify an input table, so this operation is safe
//
if (!this->needsOutputTableClear())
{
node->setOutputTable(node->getTargetTable());
}
The temp table limit is configurable, btw. (See
https://community.voltdb.com/docs/MgtGuide/HostConfigDBOpts). Or,
with a little hacking, you can reduce the limit to something very
small and not need to deal with loading a large dataset.
On another note, John enabled the github wiki:
https://github.com/VoltDB/voltdb/wiki. We intend to fill this out with
some more developer focused docs.
Ryan.
The temp table limit is configurable, btw. (See
https://community.voltdb.com/docs/MgtGuide/HostConfigDBOpts). Or,
with a little hacking, you can reduce the limit to something very
small and not need to deal with loading a large dataset.
On another note, John enabled the github wiki:
https://github.com/VoltDB/voltdb/wiki. We intend to fill this out with
some more developer focused docs.
Ryan.
Should really look like this:
RETURN RESULTS TO STORED PROCEDURE
DISTINCT
RECEIVE FROM ALL PARTITIONS
SEND PARTITION RESULTS TO COORDINATOR
DISTINCT
SEQUENTIAL SCAN of "LOG"
And the distinct should be inlined with the scan.
The above DISTINCT will re-distinct the results from each partition.
>> RECEIVE FROM ALL PARTITIONS
>> SEND PARTITION RESULTS TO COORDINATOR
>> DISTINCT
This DISTINCT will produce a unique set from each partition.
> I modified assembler to push down distinct node but have a question about
> making it inline.
> Does it mean that there actually should be two distinct nodes: The first one
> is the parent of the scan node and another one (inlined) is part of the scan
> node? That seems to be the case with the projection node. Could you please
> clarify.
One distinct is distributed and run at each partition. The other is run on
the results collected from each partition in case the partitions
return duplicates.
(If they don't return duplicates, it will be equivalent to a UNION).
Ryan.
I understand what you are saying and will experiment with DISTINCT with
GROUP BY plan but ... I am at a loss how it will solve the original problem
from 2503/2504 issues.
The execution plan corresponding to simple "select distinct C from T" (T is
replicated to simplify the plan) is
Send
Projection
Distinct
Sequential Scan
inline Projection
out-of-memory exception happens within seqscan executor for a simple reason
that optimization chaining target and output tables for this node is off.
SeqScanPlanNode::needsOutputTableClear returns true because of inlined
projection node. In such scenario SeqScanExecutor::p_execute iterates over
the whole target table inserting new tuple into the output table for each
row eventually exceeding memory limit. The distinct executor (and I suspect
the aggregated one as well) is invoked only after sequential one finishes
which is too late in this case.
One way to get around it is to change the plan to be
Send
Projection
Sequential Scan
inline Projection
inline Distinct
and call distinct expression from within SeqScanExecutor::p_execute as one
more optimization to reduce the number of tuples inserted into the scan's
output table. Does it seem reasonable?
Mike
Off the top of my head, I think the way to achieve this for Distinct, in broad strokes, MAY be to:
1. factor out the behavior of the (per input row) loop body in DistinctExecutor::p_execute -- call it DistinctExecutor::exec_one
2. annotate the SeqScanExecutor with an "inlined" DistinctExecutor object.
3. pass that annotation to the TempTable factory causing it to create a new kind of (output) TempTable -- call it DistinctTempTable.
4. implement DistinctTempTable::insertTuple to call DistinctExecutor::exec_one, populating a smaller underlying (output) TempTable.
5. implement DistinctTempTable::iterator to return an iterator over that underlying TempTable.
I'm not sure how best to draw it as a chain of nested Plan Nodes
-- the way you show the inline Projection and inline Distinct as same-level children of the Sequential Scan seems to get the idea across OK,
as long as it is understood that the mechanism by which the inline nodes feed and are fed tuples is a little different than the usual bottom-up protocol.
Paul,
Yes, it does.Off the top of my head, I think the way to achieve this for Distinct, in broad strokes, MAY be to:
1. factor out the behavior of the (per input row) loop body in DistinctExecutor::p_execute -- call it DistinctExecutor::exec_one
2. annotate the SeqScanExecutor with an "inlined" DistinctExecutor object.
Every node already has a map to hold its inline nodes. The key is node type. I don't think anything special needs to be done there.
3. pass that annotation to the TempTable factory causing it to create a new kind of (output) TempTable -- call it DistinctTempTable.
4. implement DistinctTempTable::insertTuple to call DistinctExecutor::exec_one, populating a smaller underlying (output) TempTable.
5. implement DistinctTempTable::iterator to return an iterator over that underlying TempTable.Why can't we simply reuse existing output table of the scan node Or use DistinctExecutor::p_init logic for that purpose to set the output table for the scan node?
I was also thinking of calling DistinctPlanNode::getDistinctExpression::eval chain directly from the SeqScanPlanNode::p_execute loop over the table similar to the projection node and based on the outcome either insert the tuple or continue.
I'm not sure how best to draw it as a chain of nested Plan Nodes
-- the way you show the inline Projection and inline Distinct as same-level children of the Sequential Scan seems to get the idea across OK,
as long as it is understood that the mechanism by which the inline nodes feed and are fed tuples is a little different than the usual bottom-up protocol.Yes, ordering is a problem. So far, the only inline node is the projection one. Distinct and HashAggregate are mutually exclusive so the execution order can be simply hard-coded: Projection first, then either Distinct or HasAggregate. Of cause, it the variety of the inline nodes would grow more generic solution would be required.
Hi Paul,
The pragmatic approach should work -- by definition?
Select distinct t1.c1 from t1, t2 where t1.pk1 = t2.pk2;
The current access plan is
RETURN RESULTS TO STORED PROCEDURE
DISTINCT
RECEIVE FROM ALL PARTITIONS
SEND PARTITION RESULTS TO COORDINATOR
NESTLOOP INDEX JOIN
inline (INDEX SCAN of "T1" using "SYS_IDX_PK_T1_10026" (unique-scan covering))
SEQUENTIAL SCAN of "T2"
with the proposed changes the plan may look like
RETURN RESULTS TO STORED PROCEDURE
DISTINCT
RECEIVE FROM ALL PARTITIONS
SEND PARTITION RESULTS TO COORDINATOR
NESTLOOP INDEX JOIN
inline (INDEX SCAN of "T1" using "SYS_IDX_PK_T1_10026" (unique-scan covering)
inline (DISTINCT)
)
SEQUENTIAL SCAN of "T2"
With the added complexity (inline within inline) the straightforward approach will make NestLoopIndexExecuto logic almost unmanageable.
As for the generic solution, I'm not sure why the reversed order would always make sense, but some simple rule like that seems in order. I suspect that reverse order may work short-term as an accidental effect of the order in which various inline optimizations are considered. In the long term, this aspect of the optimizer may need to be reworked to force it to follow a convention of inlining nodes in a reliable order to drive execution. Any complex logic along these lines seems better placed in the planner/optimizer, leaving the executor to iterate over a simple sequence of inlined nodes and apply them in fixed order.
If inlining is the right approach than I have few questions I would like to clarify.
1. Where inline node should go? The first scan node operating on the same table as Distinct/HashAggregate encountered by working down the plan graph may be a right choice.
2. When to disable the optimization? For example, if the distinct expression involves more then one table (right now it’s not possible simply because only one column in the expression is supported). Are there any other limitations?
3. Is PlanAssembler::handleDistinct the right place to do the inlining or is new planners/microoptimizations/PushDownDistinctIntoScans.java the better choice similar to the existing PushdownLimitsIntoScans.java?
4. The inline order. I totally agree with you that it should be established by PlanAssembler/Optimizer and not by the executors but I am not sure how.Hey there. Sorry this issue has followed a bit of a rabbit hole. Can
you send some of your code for this change? I think I need to read
some code carefully to form an opinion on the next best steps.
Ryan.
Hi,I am confused, the sequential scan executor supports distinct? There a lot of plan graph configs that don't actually work as you would expect. The executor has to support the inlined node. If you look at https://github.com/VoltDB/voltdb/blob/master/src/ee/executors/seqscanexecutor.cpp it doesn't check for that kind of inlined node.
You would have to create a new plan node above the seqscan that is a distinct executor. If the seqscan node does the optimization where it presents itself as the output table it should work as expected.
Somewhat orthogonal to what you are working on, but Inlining is a crappy hack that should go away.
It exists because executors have ginormous input/output tables instead of being chained together and outputing a row at a time via iterators.
By inlining functionality (and duplicating the code, blech!) we reduce the number of input/output tables and the number of times a tuple has to be copied as well as the amount of temp table space used during a query. That wouldn't be necessary if each executor only materialized a tuple at a time.
Thanks,Ariel
Mike,
Congratulations on your success so far.
I agree with you that while inlining offers some general benefits, its widespread use has lots of implications and complications.
It amounts to a shift in the basic architecture of the executor from a (primarily) bottom-up push-based system to a (primarily) top-down pull-based system.
Actually, executor trees for pure push-based systems tend to look structurally identical to executor trees for pull-based (iterator-based) systems -- they're just processed differently. Much of the structural complication arises from compromises that blend elements of the two systems. Inlining in a push-based system and pre-batching results in a pull-based system are classic examples. This is where the duplication of effort comes in that Ariel mentioned, as each feature gets packaged in two different styles, one of which appears as a "wart" to the prevailing architecture.
In my opinion, the real architectural issue is too tight a coupling of "specific node functionality" with generic node flow control. In other words, by refactoring the executor node code to separate the various forms of tuple processing from the push-based (or pull-based) boilerplate code that manages where the tuples are coming from and where they are going, we should be able to develop components that work equally well in a push-based, pull-based, or hybrid system without a lot of duplication, and possibly with some reduction/reuse of that boiler-plate.
In PlanAssembler.java:
The search below the distinct node for the scan where we wish to inline it appears to be a little too aggressive.
My concern is that pushing a distinct down through some nodes can lead to incorrect results.
Particular cases where this can cause problems are some joins and aggregates, though there may be others (?).
It should probably be kept simple, pushing down only through "known safe" cases, otherwise, erring on the side of less performant but correct.
Initially, we could restrict ourselves to applying the push-down to an immediate child scan.
Other cases can be added as we may discover them as "missed easy cases" in testing.
In DistinctPlanNode.java:
The DistinctPlanNode::resolveIndexes code does not actually have to deal with the child-less (inlined) node case
since the function is only called on non-inlined nodes -- those that are participating in the plan tree.
Instead, for safety, this code could be refactored in the style of ProjectionPlanNode, with the normal (non-inline) entry point making the usual assertions and delegating to a resolveColumnIndexesUsingSchema which is
an alternative entry (that should be) called by SeqScanPlanNode for inlines (again, in the style used there for inline ProjectionPlanNodes).
In distinctplannode.cpp (vs distinctexecutor.cpp):
The isTupleValueDistinct function added to distinctplannode would be better kept in distinctexecutor,
factored there (rather than duplicated) into a function that could be called both from SeqScanExecutor::p_execute loop in the new inlined case and from within the DistinctExecutor::p_execute loop for the original normal case.
I appreciate how your change follows the precedent set by the inline projection processing -- involving the plan node in the execution process rather than simply using the plan node to initialize a corresponding "inline executor node".
But I think that this approach to inlining projections may have been a mis-step and that it may be time to set a new precedent (and possibly plan to back-patch projection to follow it).
...and seqscanexecutor.cpp:
To "do the right thing" in this respect, we could construct a DistinctExecutor (possibly initializing a SeqScanExecutor pointer member initialized in SeqScanExecutor::p_init?) to do the SeqScanExecutor's dirty work.
The isTupleValueDistinct function could then be made non-static and its signature simplified so that it could operate on an INTERNAL std::set data member (migrated to be a member rather than a DistinctExecutor::p_execute local variable).
I expect that this change idiom -- migrating of state from executor::p_execute locals into executor data members (sometimes initialized in p_init) will become a recurring theme as we migrate away from "push".
BTW, I don't see the rationale for adding the TableTuple* vs. using the existing TableTuple& in SeqScanExecutor::p_execute, so the change to this function could be very small.
Finally, the changeset introduced some whitespace issues (use of spaces on empty lines or at ends of lines)
that cause it to fail our "hard rule coding standard" check-in tests. You can run "ant licensecheck" to quickly see the gory details -- which is run as an early step in "ant check" our standard check-in tests .
Unfortunately, I haven't taken the time to write the sed script that would automatically fix such issues.
Let me know if you have any questions about this.
Also, I'm still digesting your pull execution pseudo code (even while coming up to speed on the existing executor code, and possibly capturing what I'm learning for the developer wiki) -- I expect I'll get back to you in a little while.