InputStream and javaSparkStream in local mode?

1,038 views
Skip to first unread message

Michael Chung

unread,
Jul 21, 2013, 7:52:47 PM7/21/13
to spark...@googlegroups.com
So my constraint is InputStream and 1GB of memory.

I had already solved the my map reduce problem using spark it works really well using TextFile() .
The problem is I have to use inputStream instead of TextFile() and I have a memory constraint.

Except I don't have access to the file system and must satisfy the input stream contract.

InputStream => String => JavaRDD using sparkcontext.parallalize => perform map reduce => Solution to problem.

So using that strategy above I come into a heap error. Trying to copy the string to memory.

Does anyone know a work around in which I can use the inputstream. 

I have seen the JavaSparkStream example

public class JavaQueueStream {
  public static void main(String[] args) throws InterruptedException {
    if (args.length < 1) {
      System.err.println("Usage: JavaQueueStream <master>");
      System.exit(1);
    }

    // Create the context
    JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
            System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();

    // Create and push some RDDs into the queue
    List<Integer> list = Lists.newArrayList();
    for (int i = 0; i < 1000; i++) {
      list.add(i);
    }

    for (int i = 0; i < 30; i++) {
      rddQueue.add(ssc.sc().parallelize(list));
    }


    // Create the QueueInputDStream and use it do some processing
    JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
    JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
        new PairFunction<Integer, Integer, Integer>() {
          @Override
          public Tuple2<Integer, Integer> call(Integer i) throws Exception {
            return new Tuple2<Integer, Integer>(i % 10, 1);
          }
        });
    JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) throws Exception {
          return i1 + i2;
        }
    });

    reducedStream.print();
    ssc.start();
  }
}

but 1.I am not sure if this works in local mode.
2.I can use input stream correctly.






Mark Hamstra

unread,
Jul 21, 2013, 8:45:10 PM7/21/13
to spark...@googlegroups.com
First, getting all of your data into the cluster via the driver and SparkContext.parallelize is a really broken way to operate for pretty much anything except testing or toy examples -- forcing everything through that bottleneck and across the network from the driver node just won't scale for any significant amount of data.

With that in mind, if you still want to read everything into your driver node from your InputStream and then push it out to the cluster from there, you should be able to do it much like you were trying to, except in pieces so that you don't run out of driver memory.  Just read in however much you can fit into memory from your InputStream, parallelize it out to the cluster, and keep doing that for additional RDDs until you have reached the end of your stream.  Then you can use SparkContext.union with a List of the RDDs you just constructed from your InputStream to glue the pieces together into a single RDD for further processing.

It's a hack, but you were already in hack territory as soon as you went the route of funneling all of the data through the driver node.

Michael Chung

unread,
Jul 21, 2013, 9:38:59 PM7/21/13
to spark...@googlegroups.com
Hey Mark,

Thanks for replying.

I am using this library map,shuffle, sort, reduce, to solve a toy problem, by no means this is a production problem perhaps in the future when I revisit the library.
There are no libraries that I can find on google that have the map Merge Sort (shuffle) reduce (fold) in place as well as spark.
I looked at google guava library, and I was unsure about how to parallelize the processes I do not know java that well.
I was hoping that spark would give me a better performance gain, in this respect.

So I have a bit a of trouble visualizing how the library works.

This is my though process from what you said.

// Feed through the RDD with input stream until memory is part full.

//map reduce on  

// Feed through the RDD with input stream until memory is part full.

// map reduce

... till finished with stream.

// union RDD

I think with the above I would lose the performance gains and parallelism yea ? 

In the source code, from what I am guessing TextFile() must have converted the file into some kind of a stream, is there a way that I could override that function and inherit a spark context and pass the stream as a byte array ? 

I mean when I use TextFile() it works and appears to load the whole file in memory to be processed, under the 230mb memory limit.

Mark Hamstra

unread,
Jul 21, 2013, 10:56:36 PM7/21/13
to spark...@googlegroups.com
Okay, I'm not completely sure what you are trying to do, or what constraints you are trying to work within, so I'll try to explain how Spark would normally be used for such a problem, and then you'll have to try to translate that into your use case.

First, let me start off by saying that I am not understanding what kind of parallelism gain you expect to see if I am reading you correctly that you are working from a single InputStream on a single node (local mode) with very little memory (1GB) available.  Whether you are working with Scala's parallel collections or with Spark's distributed collections, having that little memory available is going to be very difficult.  Spark doesn't magically solve that problem for you -- especially not if you really "don't have access to the file system" and thus don't have any chance of spilling to disk.

Anyway, what I was trying to say previously was that if you have a cluster that is capable of handling your data once it gets out to the cluster, but that your problem is that you first need to funnel the data from an InputStream at the driver process out to the worker processes, and that the driver process only has access to a small amount of memory (i.e. not enough space to dump out the entire contents of the stream before pushing it to the cluster in a single parallelize/makeRDD operation), then you can materialize on the driver small chunks of data read from the InputStream, push them out to the cluster as separate RDDs created with SparkContext.parallelize, and then reassemble all of those RDDs into a single RDD with union().  After that, your problem would be the same as if you had created a single RDD directly from a file containing the same data by using SparkContext.textFile.
 
If your problem really is that you need to contain all of the driver and cluster processes within your severe memory constraints, then I'm not sure that Spark really is your best option -- adding the resource overhead of a complete distributed computing framework looks to me like a good way to consume much of your limited resources without accomplishing much useful.  However, if your data actually can be processed piecemeal in independent chunks, then you could use much the same idea as I outlined above, but skip the unioning to instead process each piece independently.  My guess, though, is that your pieces can't really be arbitrarily partitioned into chunks that are completely independent, and/or that the results of processing such chunks won't be meaningful without further resolution of dependencies among the resultant pieces.  Without the ability to handle a significant amount of data within the aggregate memory of your cluster, you're not going to be able to take full advantage of Spark's ability to shuffle, join, reduce, etc. the results.  In such a severely constrained environment, you'd probably end up working against the grain of how Spark is designed to be used -- which leads me back to my concern that Spark may not really be appropriate to your use case.
Reply all
Reply to author
Forward
0 new messages