Hi,
First of all thank you Fabian for the idea. I've been trying to make it work with Double[] arrays. But I ran into some difficulties.
If my code is simple enough then it works fine, but when I want to use multiple operators working with DataSets containing Tuples which has array fields then the Plan cannot be created. I give the exact details of the error below.
The code that I pasted at the end of this post is part of the CustomALS implementation where my goal is to parallelize the ALS algorithm by machine IDs.
Steps:
1. Create a dataset for the matrix
2. Partition this matrix by rows and columns with a FlatMap
3. Generate a random matrix by columns and put it into the same dataset with the partitions (It looks a bit artificial but I ran into some errors as well when I tried to make it work with the Union of more DataSets)
4 Output the random matrix
5. Output the partitions
The partitions and the random matrix are indentified through Integer IDs.
My problem is that if I just want to output the partitions (5.) then my program runs succesfully. But if I want to output the random matrix (4.) then the program Plan cannot be created and I get the following error:
"Caused by: eu.stratosphere.compiler.CompilerException: No plan meeting the requirements could be created @ GroupReduce (Create q as a random matrix) (1:null). Most likely reason: Too restrictive plan hints."
I tried different implementations but I encountered this error several times.
Please if you have any idea why the Plan cannot be created in my case share it with me because it would be crucial for me to have a working ALS algorithm as soon as possible.
Thank you,
Ferenc
public class CustomALS {
public static final String K = "k";
public static final String INDEX = "index";
public static final String NUMBER_OF_PARALLEL_TASKS = "numOfTasks";
public static final int ZERO = 0;
public static final int ONE = 1; //ID for row partitioned A
public static final int TWO = 2; //ID for column partitioned A
public static final int THREE = 3; //ID for random Q matrix
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//parse parameters
int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String matrixInput = (args.length > 1 ? args[1] : "");
String output = (args.length > 2 ? args[2] : "");
int k = 3; //now k is fixed
//create datasets for broadcasting
DataSet<Integer> k_ = env.fromElements(k);
DataSet<Integer> numSubTasks_ = env.fromElements(numSubTasks);
//Let A be a 3x3 full matrix
Tuple3<Integer,Integer,Double> m1 = new Tuple3(0,0,1.0);
Tuple3<Integer,Integer,Double> m2 = new Tuple3(0,1,2.0);
Tuple3<Integer,Integer,Double> m3 = new Tuple3(0,2,1.0);
Tuple3<Integer,Integer,Double> m4 = new Tuple3(1,0,2.0);
Tuple3<Integer,Integer,Double> m5 = new Tuple3(1,1,2.0);
Tuple3<Integer,Integer,Double> m6 = new Tuple3(1,2,3.0);
Tuple3<Integer,Integer,Double> m7 = new Tuple3(2,0,1.0);
Tuple3<Integer,Integer,Double> m8 = new Tuple3(2,1,1.0);
Tuple3<Integer,Integer,Double> m9 = new Tuple3(2,2,3.0);
DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.fromElements(m1,m2,m3,m4,m5,m6,m7,m8,m9);
/*
//read input from file
DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.readCsvFile(matrixInput)
.fieldDelimiter('|')
.lineDelimiter("|\n")
.includeFields(true, true, true)
.types(Integer.class,Integer.class,Double.class);
*/
//create the partitions of A for machines
DataSet<Tuple5<Integer,Integer,Integer,Integer,Double[]>> allPartitions = matrixSource.flatMap(new MultiplyMatrix())
.withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)
.name("Create A matrix partitions");
//adding Q random matrix to dataset
DataSet<Tuple5<Integer, Integer, Integer, Integer, Double[]>> allData = allPartitions
.groupBy(3)
.reduceGroup(new RandomMatrix())
.withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)
.withBroadcastSet(k_,K)
.name("Create q as a random matrix");
//create outputs
//(Step 4.) if the following 2 lines are uncommented then the program cannot create the Plan
DataSet<Tuple4<Integer,Double,Double,Double>> qOut = allData.flatMap(new OutputQ());
qOut.writeAsCsv(output+"/qOutput", "\n", "|");
//(Step 5.) if the above 2 lines are commented then the program succesfully creates the plan and outputs only the partitions
DataSet<Tuple3<Integer,Integer,Double>> partitionOut = allPartitions.flatMap(new OutputPartition());
partitionOut.writeAsCsv(output+"/partitionOutput", "\n", "|");
env.setDegreeOfParallelism(numSubTasks);
env.execute("CustomALS");
}
public static final class OutputQ extends FlatMapFunction<Tuple5<Integer, Integer, Integer, Integer, Double[]>,
Tuple4<Integer,Double,Double,Double>> {
@Override
public void flatMap(Tuple5<Integer,Integer,Integer,Integer,Double[]> record, Collector<Tuple4<Integer,Double,Double,Double>> out)
throws Exception {
if(record.f1 == THREE) {
Double[] elements = record.f4;
out.collect(new Tuple4(record.f2,elements[0],elements[1],elements[2]));
}
}
}
public static final class OutputPartition extends FlatMapFunction<Tuple5<Integer, Integer, Integer, Integer, Double[]>,
Tuple3<Integer,Integer,Double>> {
@Override
public void flatMap(Tuple5<Integer,Integer,Integer,Integer,Double[]> record, Collector<Tuple3<Integer,Integer,Double>> out)
throws Exception {
if(record.f1 == TWO || record.f1 == ONE) {
Double[] elements = record.f4;
out.collect(new Tuple3(record.f2,record.f3,elements[0]));
}
}
}
public static final class MultiplyMatrix extends FlatMapFunction<Tuple3<Integer,Integer,Double>,
Tuple5<Integer,Integer,Integer,Integer,Double[]>> {
private static int numOfTasks;
@Override
public void open(Configuration parameters) throws Exception {
Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
numOfTasks = numOfTasks_.iterator().next();
super.open(parameters);
}
@Override
public void flatMap(Tuple3<Integer,Integer, Double> record, Collector<Tuple5<Integer,Integer,Integer,Integer,Double[]>> out)
throws Exception {
Integer rowIndex = record.f0;
Integer colIndex = record.f1;
Double[] element = new Double[1];
element[0] =(Double) record.f2;
//collect rowPartitoned A for P iterations
Integer machineIndex1 = rowIndex % numOfTasks;
Tuple5<Integer,Integer,Integer,Integer,Double[]> output1 = new Tuple5(machineIndex1,ONE,rowIndex,colIndex,element);
out.collect(output1);
//collect colPartitioned A for Q iterations
Integer machineIndex2 = colIndex % numOfTasks;
Tuple5<Integer,Integer,Integer,Integer,Double[]> output2 = new Tuple5(machineIndex2,TWO,rowIndex,colIndex,element);
out.collect(output2);
}
}
public static final class RandomMatrix extends GroupReduceFunction<Tuple5<Integer,Integer,Integer,Integer,Double[]>,
Tuple5<Integer,Integer,Integer, Integer, Double[]>> {
private static int k;
private static int numOfTasks;
private Double[] vector_elements;
private final Random RANDOM = new Random();
@Override
public void open(Configuration parameters) throws Exception {
Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
numOfTasks = numOfTasks_.iterator().next();
Collection<Integer> k_ = getRuntimeContext().getBroadcastVariable(K);
k = k_.iterator().next();
super.open(parameters);
}
@Override
public void reduce(Iterator<Tuple5<Integer, Integer, Integer, Integer, Double[]>> elements,
Collector<Tuple5<Integer, Integer, Integer, Integer, Double[]>> out) throws Exception {
//generate the Q column vector with this id and send it to all machines
Tuple5<Integer, Integer, Integer, Integer, Double[]> element = elements.next();
vector_elements = new Double[k];
for (int i = 0; i < k; ++i) {
vector_elements[i] = 1 + RANDOM.nextDouble() / 2;
}
for(int i=0; i<numOfTasks; i++){
Tuple5<Integer, Integer, Integer, Integer, Double[]> vector = new Tuple5(i,THREE, element.f3,ZERO, vector_elements);
out.collect(vector);
}
//after this we send forward all the other A partitions
while(elements.hasNext()) {
out.collect(elements.next());
}
}
}
}