Some questions on iterations and aggregators

38 views
Skip to first unread message

Vasia Kalavri

unread,
May 21, 2014, 1:50:43 PM5/21/14
to stratosp...@googlegroups.com
Hi,

I am trying to implement an iterative application, which, in high level, looks like this:

initialState = readSomeInput
otherInput = readSomeOtherInput

for 1 to max_iterations do:
  filteredInput = otherInput.filter(predicate_on_aggregator_value);
  updatedState = state.join(filteredInput)....other_operators_with_aggregator
  closeWith(updatedState)

A computation inside the step function updates an aggregator value, which I want to use in the next iteration to filter out tuples from otherInput.
When I tried this, I got the error "This stub is not part of an iteration step function", when attempting to read the iteration aggregator value inside the filter operation.
Is there a way to include an operator inside the iteration, if it doesn't have the IterativeDataSet as predecessor? Or do you see any way around this?

I also tried implementing this using a delta iteration and setting otherInput as the workset, but then my program fails with the compiler exception "No plan meeting the requirements could be created @ Workset Iteration (1:null)(2:null). Most likely reason: Too restrictive plan hints." What does this error mean?

My second question has to do with computing a dataset that is not "consumed" by any operator. I want to perform a join so that I can compute some aggregate value, but I don't need the output of the join itself.
Is it possible to do something like that?

Thanks a lot!

Cheers,
V.

Stephan Ewen

unread,
Jun 4, 2014, 5:46:30 PM6/4/14
to stratosp...@googlegroups.com
Hey Vasia!

Thanks for always pushing the boundaries of the use cases ;-)

  - I don't think I fully understand the program you have. Can you paste a bit of the rough program structure? In general, it is true, Aggregators only in iterations, which means as successors to the IterativeDataSet or der Workset. We have started unifying the Aggregators and Accumulators, which will make them more universally usable (https://groups.google.com/forum/#!topic/stratosphere-dev/YqypzNVVqas).

  - What you need to run the aggregators would be the ability to root a data sink in the iteration (here the aggregator updating function would never return data and the sink would be always empty). I am trying to think of a better way to "get rid" of a data set, but that is the first thing that comes to my mind. We'd need to add support for "iteration rooted" data sinks, though.

  - The "No plan meeting the requirements" message is clearly an optimizer bug. I have reworked the branch and join tracking logic there at the beginning of this week. Chances are good it is fixed now, could you try that?

Stephan




Reply all
Reply to author
Forward
0 new messages