Arbitrary length vector of basic types with tuple

27 views
Skip to first unread message

Ferenc Béres

unread,
May 15, 2014, 6:21:11 AM5/15/14
to stratosp...@googlegroups.com
Hi,

I'm Ferenc Béres. I currently work in the datamining group of SZTAKI, in Hungary. I would like to ask for your help in the matter of
how to implement the Alternating Least Square (ALS) algorithm in Stratosphere 0.5 without the Record abstraction.

Currently I have a working implementation of ALS in Stratosphere 0.5.
For input I give a sparse matrix A, which I would like to decompose into P and Q matrices of rank k, where k is also an input of the program.

I used Records of length k in which I store the columns of P and Q when I use operators on them.

My goal is to completely eliminate all Record, along with all IntValue, DoubleValue etc. abstractions from my code in order to make it more compact.
I could use nested Tuples if I would fix the k parameter, but I would like to keep k as an arbitrary input parameter.

It seems that this could be a general issue when someone needs a "vector" of basic types to be passed between second order functions.

So far I could not come up with any good idea, so that's why I would like to ask for your help.
If you have any idea how the code should be rewritten without Record and the former Value abstractions please share it with me.

Thank you.

Fabian Hueske

unread,
May 15, 2014, 6:34:43 AM5/15/14
to stratosp...@googlegroups.com
Hi Ferenc,

the new Java API supports arrays of primitive types.
You can have data sets like

DataSet<int[]> intA;
DataSet<Tuple2<String, double[]>> tStringDoubleA;

The length of the array is dynamic at runtime.

Does this answer your question?

Best,
Fabian



--
You received this message because you are subscribed to the Google Groups "stratosphere-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to stratosphere-d...@googlegroups.com.
Visit this group at http://groups.google.com/group/stratosphere-dev.
For more options, visit https://groups.google.com/d/optout.

Gyula Fóra

unread,
May 21, 2014, 5:17:01 AM5/21/14
to stratosp...@googlegroups.com
Hi,

We ran into an error regarding this. When we create a tuple with arrays  like ( Tuple1<int[]> ) we get the following exception at serialization:

"Type at position 0 is not a basic type."

Do you have any ideas what might be the cause of this issue?

Thanks,
Gyula

Fabian Hueske

unread,
May 21, 2014, 5:57:07 AM5/21/14
to stratosp...@googlegroups.com
Hi,

I tried to reproduce the bug but did not succeed.
The program pasted below works fine for me.

Having a Tuple1<int[]> shouldn't cause problems with serialization. However, you cannot group on the int[] field.
This needs to be an atomic key type.

Can you give a bit more context?
- Are you using the latest snapshot version (master branch)?
- Sketch the program and give the full stacktrace?

Best,
Fabian


//-----------------------------------------------

public static void main(String[] args) throws Exception {
       
        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setDegreeOfParallelism(2);
       
        List<int[]> intAList = new ArrayList<int[]>();
        intAList.add(new int[]{1,2,3});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{4,5,6,7});
        intAList.add(new int[]{8});
        intAList.add(new int[]{9,0});
       
        // get input data
        DataSet<int[]> intAs = env.fromCollection(intAList);
       
        DataSet<Integer> sums = intAs.map(new MapFunction<int[], Integer>() {
            @Override
            public Integer map(int[] ints) {
                int sum = 0;
                for(int i : ints) {
                    sum += i;
                }
                return sum;
            }
        }).setParallelism(8); // change DOP to enforce serialization of data
       
        sums.print();
       
        // execute program
        env.execute("Test Prog");
    }

//-----------------------------------------------

Gyula Fóra

unread,
May 21, 2014, 6:35:46 AM5/21/14
to stratosp...@googlegroups.com
Hello,

Maybe the whole problem lies in our use of the tuple for streaming. This is pretty much how we serialize our tuple:

ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);

Tuple1<int[]> tuple = new Tuple1<int[]>(new int[]{1});

Class[] types = new Class[] { int[].class };

TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(types);
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();

SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
serializationDelegate.setInstance(tuple);
serializationDelegate.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
System.out.println(dd.getInstance());

The Typeinformation

Gyula Fóra

unread,
May 21, 2014, 6:37:14 AM5/21/14
to stratosp...@googlegroups.com
Hello,

Maybe the whole problem lies in our use of the tuple for streaming. This is pretty much how we serialize our tuple:

ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);

Tuple1<int[]> tuple = new Tuple1<int[]>(new int[]{1});

Class[] types = new Class[] { int[].class };

TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(types);
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo.createSerializer();

SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(tupleSerializer);
serializationDelegate.setInstance(tuple);
serializationDelegate.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
System.out.println(dd.getInstance());

Creating the Typeinformation object throws the exception. Maybe we should go about serializing tuples in a different way?

Thanks,
Gyula

Stephan Ewen

unread,
May 21, 2014, 6:42:29 AM5/21/14
to stratosp...@googlegroups.com
Hi!

The problem is this Method: getBasicTupleTypeInfo()  It works only for basic types. There is actually no strong need for that restriction, but that is the current state.

Use the following:
Tuple1<int[]> tuple = new Tuple1<int[]>(new int[]{1});
TypeInformation<? extends Tuple> typeInfo = TypeExtractor.getForObject(tuple)

That should do it.

Gyula Fóra

unread,
May 21, 2014, 6:44:35 AM5/21/14
to stratosp...@googlegroups.com
Hi!
Amazing, we didn't know about the TypeExtractor.

Thank you!

Gyula Fóra

unread,
May 21, 2014, 6:53:31 AM5/21/14
to stratosp...@googlegroups.com
First I thought it solved our problems, but our case is actually a little different.

When we deserialize the tuple we do not have the types of the original one. This basically means we somehow need to send type information along with the tuple. It would be nice to be able to serialize typeInfo somehow and send it before the tuple.

So TypeExtractor.getForObject works for serializing it. But now we are stuck with the serialized tuple without knowing its typeinfo on the other side.

Gyula

Stephan Ewen

unread,
May 21, 2014, 7:19:01 AM5/21/14
to stratosp...@googlegroups.com
Hey!

Okay, your paradigm is a bit different there. With the type information, we made the assumption that types are statically known for all operators when you submit them to the runtime.

If you know that all elements are of the same type, you could try sending an event with the serialized type information. I am not sure if it is serializable. If not, make it serializable, should be quite possible.

If you want dynamic typing, you can try out sending the type with the object. If you do this every time, it is quite an overhead. This is solved somewhat by Libraries like Kryo (they assign tags on the fly), which we could connect to our DataDoutputStream representation.

Stephan

Gyula Fóra

unread,
May 21, 2014, 7:45:46 AM5/21/14
to stratosp...@googlegroups.com
Hey,

A fast workaround to get it working could be to store the information in a String so that the TypeInformation.parse(infoString) could retrieve the type information.

But we need to figure out if we want to make "static" types to increase speed.

Gyula


--

Ferenc Béres

unread,
May 28, 2014, 8:56:50 AM5/28/14
to stratosp...@googlegroups.com
Hi,

First of all thank you Fabian for the idea. I've been trying to make it work with Double[] arrays. But I ran into some difficulties.

If my code is simple enough then it works fine, but when I want to use multiple operators working with DataSets containing Tuples which has array fields then the Plan cannot be created. I give the exact details of the error below.

The code that I pasted at the end of this post is part of the CustomALS implementation where my goal is to parallelize the ALS algorithm by machine IDs.

Steps:
1. Create a dataset for the matrix
2. Partition this matrix by rows and columns with a FlatMap
3. Generate a random matrix by columns and put it into the same dataset with the partitions (It looks a bit artificial but I ran into some errors as well when I tried to make it work with the Union of more DataSets)
4  Output the random matrix
5. Output the partitions

The partitions and the random matrix are indentified through Integer IDs.


My problem is that if I just want to output the partitions (5.) then my program runs succesfully. But if I want to output the random matrix (4.) then the program Plan cannot be created and I get the following error:

"Caused by: eu.stratosphere.compiler.CompilerException: No plan meeting the requirements could be created @ GroupReduce (Create q as a random matrix) (1:null). Most likely reason: Too restrictive plan hints."

I tried different implementations but I encountered this error several times.



Please if you have any idea why the Plan cannot be created in my case share it with me because it would be crucial for me to have a working ALS algorithm as soon as possible.

Thank you,
Ferenc


public class CustomALS {

  public static final String K = "k";
  public static final String INDEX = "index";
  public static final String NUMBER_OF_PARALLEL_TASKS = "numOfTasks";
  public static final int ZERO = 0;
  public static final int ONE = 1; //ID for row partitioned A
  public static final int TWO = 2; //ID for column partitioned A
  public static final int THREE = 3; //ID for random Q matrix


  public static void main(String[] args) throws Exception {
   
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     
    //parse parameters
    int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
    String matrixInput = (args.length > 1 ? args[1] : "");
    String output = (args.length > 2 ? args[2] : "");
    int k = 3; //now k is fixed

    //create datasets for broadcasting
    DataSet<Integer> k_ = env.fromElements(k);
    DataSet<Integer> numSubTasks_ = env.fromElements(numSubTasks);

    //Let A be a 3x3 full matrix
    Tuple3<Integer,Integer,Double> m1 = new Tuple3(0,0,1.0);
    Tuple3<Integer,Integer,Double> m2 = new Tuple3(0,1,2.0);
    Tuple3<Integer,Integer,Double> m3 = new Tuple3(0,2,1.0);
    Tuple3<Integer,Integer,Double> m4 = new Tuple3(1,0,2.0);
    Tuple3<Integer,Integer,Double> m5 = new Tuple3(1,1,2.0);
    Tuple3<Integer,Integer,Double> m6 = new Tuple3(1,2,3.0);
    Tuple3<Integer,Integer,Double> m7 = new Tuple3(2,0,1.0);
    Tuple3<Integer,Integer,Double> m8 = new Tuple3(2,1,1.0);
    Tuple3<Integer,Integer,Double> m9 = new Tuple3(2,2,3.0);

    DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.fromElements(m1,m2,m3,m4,m5,m6,m7,m8,m9);
/*   
    //read input from file
    DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.readCsvFile(matrixInput)
           .fieldDelimiter('|')
           .lineDelimiter("|\n")
           .includeFields(true, true, true)
           .types(Integer.class,Integer.class,Double.class);
*/
    //create the partitions of A for machines
    DataSet<Tuple5<Integer,Integer,Integer,Integer,Double[]>> allPartitions = matrixSource.flatMap(new MultiplyMatrix())
        .withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)
            .name("Create A matrix partitions");
   
    //adding Q random matrix to dataset
    DataSet<Tuple5<Integer, Integer, Integer, Integer, Double[]>> allData = allPartitions
          .groupBy(3)
          .reduceGroup(new RandomMatrix())
          .withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)
          .withBroadcastSet(k_,K)
          .name("Create q as a random matrix");
    
    //create outputs
   
    //(Step 4.) if the following 2 lines are uncommented then the program cannot create the Plan
    DataSet<Tuple4<Integer,Double,Double,Double>> qOut = allData.flatMap(new OutputQ());
    qOut.writeAsCsv(output+"/qOutput", "\n", "|");
  
    //(Step 5.) if the above 2 lines are commented then the program succesfully creates the plan and outputs only the partitions
    DataSet<Tuple3<Integer,Integer,Double>> partitionOut = allPartitions.flatMap(new OutputPartition());
    partitionOut.writeAsCsv(output+"/partitionOutput", "\n", "|");
   
    env.setDegreeOfParallelism(numSubTasks);
    env.execute("CustomALS");
  }

  public static final class OutputQ extends FlatMapFunction<Tuple5<Integer, Integer, Integer, Integer, Double[]>,
       Tuple4<Integer,Double,Double,Double>> {
 
    @Override
    public void flatMap(Tuple5<Integer,Integer,Integer,Integer,Double[]> record, Collector<Tuple4<Integer,Double,Double,Double>> out)
        throws Exception {
      if(record.f1 == THREE) {
        Double[] elements = record.f4;
        out.collect(new Tuple4(record.f2,elements[0],elements[1],elements[2]));       
      }
    }
  }

  public static final class OutputPartition extends FlatMapFunction<Tuple5<Integer, Integer, Integer, Integer, Double[]>,
       Tuple3<Integer,Integer,Double>> {
 
    @Override
    public void flatMap(Tuple5<Integer,Integer,Integer,Integer,Double[]> record, Collector<Tuple3<Integer,Integer,Double>> out)
        throws Exception {
      if(record.f1 == TWO || record.f1 == ONE) {
        Double[] elements = record.f4;
        out.collect(new Tuple3(record.f2,record.f3,elements[0]));       
      }
    }
  }


  public static final class MultiplyMatrix extends FlatMapFunction<Tuple3<Integer,Integer,Double>,
       Tuple5<Integer,Integer,Integer,Integer,Double[]>> {

    private static int numOfTasks;
 
    @Override
    public void open(Configuration parameters) throws Exception {
      Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
      numOfTasks = numOfTasks_.iterator().next();
   
      super.open(parameters);
    }
 
    @Override
    public void flatMap(Tuple3<Integer,Integer, Double> record, Collector<Tuple5<Integer,Integer,Integer,Integer,Double[]>> out)
        throws Exception {
   
      Integer rowIndex = record.f0;
      Integer colIndex = record.f1;
      Double[] element = new Double[1];
      element[0] =(Double) record.f2;
  
      //collect rowPartitoned A for P iterations
      Integer machineIndex1 = rowIndex % numOfTasks;
      Tuple5<Integer,Integer,Integer,Integer,Double[]> output1 = new Tuple5(machineIndex1,ONE,rowIndex,colIndex,element);
      out.collect(output1);
   

      //collect colPartitioned A for Q iterations
      Integer machineIndex2 = colIndex % numOfTasks;
      Tuple5<Integer,Integer,Integer,Integer,Double[]> output2 = new Tuple5(machineIndex2,TWO,rowIndex,colIndex,element);
      out.collect(output2);
    }

  }

  public static final class RandomMatrix extends GroupReduceFunction<Tuple5<Integer,Integer,Integer,Integer,Double[]>,
       Tuple5<Integer,Integer,Integer, Integer, Double[]>> {

    private static int k;
    private static int numOfTasks;

    private Double[] vector_elements;
    private final Random RANDOM = new Random();
 
    @Override
    public void open(Configuration parameters) throws Exception {
   
      Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
      numOfTasks = numOfTasks_.iterator().next();
   
      Collection<Integer> k_ = getRuntimeContext().getBroadcastVariable(K);
      k = k_.iterator().next();
       
      super.open(parameters);
    }

    @Override
    public void reduce(Iterator<Tuple5<Integer, Integer, Integer, Integer, Double[]>> elements,
        Collector<Tuple5<Integer, Integer, Integer, Integer, Double[]>> out) throws Exception {
   
      //generate the Q column vector with this id and send it to all machines
      Tuple5<Integer, Integer, Integer, Integer, Double[]> element = elements.next();
      vector_elements = new Double[k];
      for (int i = 0; i < k; ++i) {
        vector_elements[i] = 1 + RANDOM.nextDouble() / 2;
      }
   
      for(int i=0; i<numOfTasks; i++){
    Tuple5<Integer, Integer, Integer, Integer, Double[]> vector = new Tuple5(i,THREE, element.f3,ZERO, vector_elements);
        out.collect(vector);
      }
   
      //after this we send forward all the other A partitions
      while(elements.hasNext()) {
        out.collect(elements.next());
      }

    }
  }

}


--
You received this message because you are subscribed to a topic in the Google Groups "stratosphere-dev" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/stratosphere-dev/Zzp2pPjKGxg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to stratosphere-d...@googlegroups.com.

Fabian Hueske

unread,
May 28, 2014, 9:19:54 AM5/28/14
to stratosp...@googlegroups.com
Hi Ferenc,

the bad news is this seems to be a bug in the optimizer that needs to be fixed. :-(
But the good news is, I found a simple way to get it to work :-)

You are using BroadcastVariables to pass configuration values to your functions. This is not the intended purpose of BCVs (but it should work nonetheless).
A far easier and more comfortable way to set config parameter is via the constructor of your function. Alternatively you can use the Configuration object, but this is less comfortable...
I adapted your code and it is compiling and running now (did not check the result though...)

Cheers, Fabian

------------------


public class CustomALS {

      public static final String K = "k";
      public static final String INDEX = "index";
      public static final String NUMBER_OF_PARALLEL_TASKS = "numOfTasks";
      public static final int ZERO = 0;
      public static final int ONE = 1; //ID for row partitioned A
      public static final int TWO = 2; //ID for column partitioned A
      public static final int THREE = 3; //ID for random Q matrix


      public static void main(String[] args) throws Exception {
      
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        //parse parameters
        int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
        String matrixInput = (args.length > 1 ? args[1] : "");
        String output = (args.length > 2 ? args[2] : "");
        int k = 3; //now k is fixed

        //create datasets for broadcasting
        //DataSet<Integer> k_ = env.fromElements(k);
        //DataSet<Integer> numSubTasks_ = env.fromElements(numSubTasks);


        //Let A be a 3x3 full matrix
        Tuple3<Integer,Integer,Double> m1 = new Tuple3(0,0,1.0);
        Tuple3<Integer,Integer,Double> m2 = new Tuple3(0,1,2.0);
        Tuple3<Integer,Integer,Double> m3 = new Tuple3(0,2,1.0);
        Tuple3<Integer,Integer,Double> m4 = new Tuple3(1,0,2.0);
        Tuple3<Integer,Integer,Double> m5 = new Tuple3(1,1,2.0);
        Tuple3<Integer,Integer,Double> m6 = new Tuple3(1,2,3.0);
        Tuple3<Integer,Integer,Double> m7 = new Tuple3(2,0,1.0);
        Tuple3<Integer,Integer,Double> m8 = new Tuple3(2,1,1.0);
        Tuple3<Integer,Integer,Double> m9 = new Tuple3(2,2,3.0);

        DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.fromElements(m1,m2,m3,m4,m5,m6,m7,m8,m9);
    /*  
        //read input from file
        DataSet<Tuple3<Integer,Integer,Double>> matrixSource = env.readCsvFile(matrixInput)
               .fieldDelimiter('|')
               .lineDelimiter("|\n")
               .includeFields(true, true, true)
               .types(Integer.class,Integer.class,Double.class);
    */
        //create the partitions of A for machines
        DataSet<Tuple5<Integer,Integer,Integer,Integer,Double[]>> allPartitions = matrixSource.flatMap(new MultiplyMatrix(numSubTasks))
//            .withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)

                .name("Create A matrix partitions");
      
        //adding Q random matrix to dataset
        DataSet<Tuple5<Integer, Integer, Integer, Integer, Double[]>> allData = allPartitions
              .groupBy(3)
              .reduceGroup(new RandomMatrix(numSubTasks, k))
//              .withBroadcastSet(numSubTasks_,NUMBER_OF_PARALLEL_TASKS)
//              .withBroadcastSet(k_,K)
        public MultiplyMatrix(int numTasks) {  
            numOfTasks = numTasks;
        }
     
//        @Override
//        public void open(Configuration parameters) throws Exception {
//          Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
//          numOfTasks = numOfTasks_.iterator().next();
//      
//          super.open(parameters);
//        }

     
        @Override
        public void flatMap(Tuple3<Integer,Integer, Double> record, Collector<Tuple5<Integer,Integer,Integer,Integer,Double[]>> out)
            throws Exception {
      
          Integer rowIndex = record.f0;
          Integer colIndex = record.f1;
          Double[] element = new Double[1];
          element[0] =(Double) record.f2;
     
          //collect rowPartitoned A for P iterations
          Integer machineIndex1 = rowIndex % numOfTasks;
          Tuple5<Integer,Integer,Integer,Integer,Double[]> output1 = new Tuple5(machineIndex1,ONE,rowIndex,colIndex,element);
          out.collect(output1);
      

          //collect colPartitioned A for Q iterations
          Integer machineIndex2 = colIndex % numOfTasks;
          Tuple5<Integer,Integer,Integer,Integer,Double[]> output2 = new Tuple5(machineIndex2,TWO,rowIndex,colIndex,element);
          out.collect(output2);
        }

      }

      public static final class RandomMatrix extends GroupReduceFunction<Tuple5<Integer,Integer,Integer,Integer,Double[]>,
           Tuple5<Integer,Integer,Integer, Integer, Double[]>> {

        private static int k;
        private static int numOfTasks;

        private Double[] vector_elements;
        private final Random RANDOM = new Random();
     
        public RandomMatrix(int numTasks, int k) {
            this.numOfTasks = numTasks;
            this.k = k;
        }
       
//        @Override
//        public void open(Configuration parameters) throws Exception {
//      
//          Collection<Integer> numOfTasks_ = getRuntimeContext().getBroadcastVariable(NUMBER_OF_PARALLEL_TASKS);
//          numOfTasks = numOfTasks_.iterator().next();
//      
//          Collection<Integer> k_ = getRuntimeContext().getBroadcastVariable(K);
//          k = k_.iterator().next();
//          
//          super.open(parameters);
//        }

Ferenc Béres

unread,
May 28, 2014, 4:11:19 PM5/28/14
to stratosp...@googlegroups.com
Hi Fabian,

Wonderful, now it really creates the Plan and works fine with one modification.

I have to change the k and numOfTasks to non-static otherwise their values weren't really passed to the OperatorFunctions.

Thank you!

Cheers,
Ferenc

Robert Metzger

unread,
May 31, 2014, 4:55:05 AM5/31/14
to stratosp...@googlegroups.com
Cool! Very good to hear that your code is working.

We have filed a bug report on GitHub to resolve the issue you've initially reported: https://github.com/stratosphere/stratosphere/issues/880.
Thanks for improving our project!
Reply all
Reply to author
Forward
0 new messages