Distributed MapReduce in Go, is it possible?

3,971 views
Skip to first unread message

Anh Hai Trinh

unread,
May 24, 2013, 8:00:29 AM5/24/13
to minux, Paulo Pinto, golang-nuts
> Can you please give an example that couldn't be "easily" done in a strong typed > modern language? (how easy could be regarded as "easily"?)

Instead I will give two that cannot be "easily" done with Go. I'd love to be proven wrong here:

1. MapReduce.

2. Spark (http://spark-project.org/), or distributed & resilient in-memory MR

In Spark you'd have a distributed collection e.g. RDD[Double] which can be reduce(_+_) to sum.

I really do mean it would be wonderful to be able to do a Go clone of Spark that is efficient. For example each RDD[Int] in Spark, when cached in memory, is really stored as primitive integers without boxing memory/GC overhead (which is really a big deal when you try to cache a 100GB dataset into a cluster's memory)

Someone has already done a Spark clone in Python which is used in production (https://github.com/douban/dpark). The Berkeley guys provide a Python API on top of Spark as well.

Can Go do this?


On Thu, May 23, 2013 at 11:37 PM, minux <minu...@gmail.com> wrote:

On Fri, May 24, 2013 at 12:23 AM, Paulo Pinto <paulo....@gmail.com> wrote:
That belongs to  "could be easily done in any strong typed modern
language".
Let's ask the opposite question:
Can you please give an example that couldn't be "easily" done in a strong typed modern
language? (how easy could be regarded as "easily"?)

On 23 Mai, 17:37, minux <minux...@gmail.com> wrote:
> On Thu, May 23, 2013 at 10:50 PM, Paulo Pinto <paulo.jpi...@gmail.com>wrote:
>
> > Well to be honest, all the projects that are known where Go is being
> > used at Google, could be easily
> > done in any strong typed modern language free from C and C++ pre-
> > historic toolchains.
>
> > Until a big Google project is shown being done in Go, people will
> > doubt how much it is really being used
>
> code.google.com/p/vitess ?

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
@chickamade

Sanjay

unread,
May 24, 2013, 12:55:58 PM5/24/13
to golan...@googlegroups.com, minux, Paulo Pinto
I believe the answer to both of your questions is yes. The reason is, strangely enough, existence proofs from a language with generics, Java.

In the previous thread, you made generics sound like a requirement for MapReduce. I invite you to take a look at the Cascading API, which is a Java implementation of a stream processing library that resolves streams to a series of MapReduce jobs. Specifically, take a look at how Tuples are passed around in Cascading: http://docs.concurrentinc.com/cascading/2.2/javadoc/cascading/tuple/Tuple.html. It's literally an object with a getBoolean, getInteger, etc series of methods. Cascading is one of the most popular MR frameworks that I know of. 

The even more obvious example is if you look at Hadoop Streaming: http://hadoop.apache.org/docs/stable/streaming.html#Hadoop+Streaming. That definitely works for Go. The only annoyance is the (by default) line-oriented output, rather than a more compact encoding; although I believe this is configurable.

Generics aren't required for MR.

As for Spark, as with LINQ, that kind of terse functional method-calling in Go is, AFAIK, impossible. But, again, look at Storm's API for doing realtime computation: https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/tuple/TridentTuple.java. You see the exact same kind of thing. If Storm can do it with a low-level API thats basically untyped, I don't see why Go couldn't.

I hope these two examples have shown your implication "JVM languages have generics and hence MapReduce" as not causally related as these APIs clearly do not make use of generics for MR or for realtime stream processing.

Cheers
Sanjay

Ziad Hatahet

unread,
May 24, 2013, 3:44:52 PM5/24/13
to Sanjay, golan...@googlegroups.com, minux, Paulo Pinto
On Fri, May 24, 2013 at 9:55 AM, Sanjay <balas...@gmail.com> wrote:

As for Spark, as with LINQ, that kind of terse functional method-calling in Go is, AFAIK, impossible. But, again, look at Storm's API for doing realtime computation: https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/tuple/TridentTuple.java. You see the exact same kind of thing. If Storm can do it with a low-level API thats basically untyped, I don't see why Go couldn't.


Though in Scala you could do something along the lines of:

val i = tuple.get[Int](0) // Returned value is of type Int
val s = tuple.get[String](0) // Returned value is of type String

As such, you have a clean, single method interface, instead of exposing multiple getters (getXXX).


--
Ziad

Sanjay

unread,
May 24, 2013, 10:32:02 PM5/24/13
to golan...@googlegroups.com, Sanjay, minux, Paulo Pinto
I am not contesting the fact that you can make nicer APIs with generics; I am contesting the notion that generics are a necessary prerequisite to writing MapReduces.

Sanjay

Anh Hai Trinh

unread,
May 25, 2013, 2:31:15 AM5/25/13
to Sanjay, golang-nuts, minux, Paulo Pinto
> I am not contesting the fact that you can make nicer APIs with generics; I am contesting the notion that generics are a necessary prerequisite to writing MapReduces.

I agree with this statement per se.

However, this take the world view that "everything is just a byte stream". Only in this sense that generics are not required for MapReduce.

HadoopStreaming has to be all byte stream per line in order to support all languages where the common denominator is just stdin/stdout. Don't get me wrong: stdin/stdout byte stream is absolutely brilliant from an operating system point of view.

However, the programming languages world has evolved from this byte-oriented world view since ASM and the B language. That's why we have types. Go channels is supposed to be typed system pipes, no?

MapReduce has its roots in high-order (generic) functions, writing Java MapReduce programs ensure that map/reduce input/output are correctly matched, i.e. _type-safe_, and also more _efficient_.

Regarding, Spark, terse syntax is just a nicety, Spark has a Java API which is also typesafe (less efficient than Scala due to Java generics requiring reference type, but still more efficient than having to ser/de): http://spark-project.org/docs/latest/java-programming-guide.html

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      public Iterable<String> call(String s) {
        return Arrays.asList(s.split(" "));
      }
    });
    
    JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    });
    
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

Sanjay

unread,
May 25, 2013, 3:02:11 AM5/25/13
to golan...@googlegroups.com, Sanjay, minux, Paulo Pinto
I gave 3 examples. The first was a Java API for MR that did not use generics in the way that you suggested. My experience leads me to believe that it is the most popular library for doing MR, particularly if you also count the users of Scalding, Cascalog and PyCascading. That API could be mapped (no pun intended) almost one-to-one to a Go API. My third example was of Storm, which also used essentially untyped tuples. Again, this API could be mapped almost one-to-one to a Go API. You asked if this could be done in Go. I say yes. What about these two APIs makes you think it cannot be mapped to Go?

I do think Hadoop Streaming was a poorer example than the two above, although I contest your notion that going through a byte-stream phase is so difficult. Remember, I am just contesting the notion that you _require_ generics to have MapReduce. That implies that without generics, MapReduce is impossible. I employed my favorite proof method, proof by example; in particular, I provided three examples, feel free to ignore Hadoop Streaming, if you'd like. But certainly tell me what about Cascading or Storm you think would be impossible to replicate in Go. 

Also, I think you are confused as to how your tools work. You say that it doesn't require serialization and deserialization when you have strongly-typed generic code, and that its more efficient; how exactly do you think the data gets across the network?

Cheers,
Sanjay

Anh Hai Trinh

unread,
May 25, 2013, 3:25:06 AM5/25/13
to Sanjay, golang-nuts, minux, Paulo Pinto
Hey Sanjay,

I will take closer look into Cascalding and Storm.

Re getting the data across the network: yes at some point you must shuffle data, but pipelined transformation can be run on native types before going out right? Combiner can work directly on the output of the Map phases without serde.

This is especially relevant for Spark's most revealing use case: running multi iterations machine learning algorithms on a distributed collection, where it can be 100X faster than Hadoop. Here you have say a in-memory collection RDD[Double] being map/reduced on, where the collection are native values in JVM hence no serde for map/combine phases and only serde of the the post-combiner values.

So the overhead may be 2-5x, multiply that by 100 iterations and you've got a job that finishes in 5-10m instead of 1-2hour (Spark also has other optimization of course).


--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
@chickamade

Sanjay

unread,
May 25, 2013, 4:34:19 AM5/25/13
to golan...@googlegroups.com, Sanjay, minux, Paulo Pinto
Cascading does use Map-side aggregation: http://docs.cascading.org/cascading/1.2/javadoc/cascading/pipe/assembly/AggregateBy.html

It also uses the same Tuple class for this purpose, and as far as I could tell, could be implemented exactly the same in Go, without any need for serialization and deserialization (until the shuffle phase, of course).  

I have no experience with Spark, so I don't know enough about it to comment. But from my casual perusal of their website, I believe that Storm solves a harder problem than Spark. Some of the teams at Twitter have built some truly incredible realtime streaming computation pipelines using Storm. 

Also, out of curiosity, how does Spark manage to have generics of value types? As far as I know, the limitation is not a Java limitation, but a JVM one, so unless Spark has a custom compiler and runtime, I don't see how they could be doing generics without boxing, and allocation.

Sanjay

Anh Hai Trinh

unread,
May 25, 2013, 8:03:04 AM5/25/13
to Sanjay, golang-nuts, minux, Paulo Pinto
On Sat, May 25, 2013 at 3:34 PM, Sanjay <balas...@gmail.com> wrote:
It also uses the same Tuple class for this purpose, and as far as I could tell, could be implemented exactly the same in Go, without any need for serialization and deserialization (until the shuffle phase, of course).  

So it looks like a Tuple is a container for some boxed java Object with some helper func to get a casted value at some index, right? Basically a dynamic-typed container that could raise runtime exceptions?
 
I have no experience with Spark, so I don't know enough about it to comment. But from my casual perusal of their website, I believe that Storm solves a harder problem than Spark.

They each solve a very different problem: iterative machine learning algo on big data vs real time analytics. SparkStreaming (http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) would be comparable to Storm however it is much less mature.
 
Also, out of curiosity, how does Spark manage to have generics of value types? As far as I know, the limitation is not a Java limitation, but a JVM one, so unless Spark has a custom compiler and runtime, I don't see how they could be doing generics without boxing, and allocation.

Right, that would be the Scala compiler, which can generate specialzed class. See: http://www.artima.com/pins1ed/combining-scala-and-java.html#29.1 (Value types section) and https://groups.google.com/forum/?fromgroups#!topic/spark-users/PBgn4V_Fcu8

--
@chickamade

Damian Gryski

unread,
May 25, 2013, 5:23:32 PM5/25/13
to golan...@googlegroups.com
There are at least two Go libraries that make working with hadoop streaming easy and type safe modulo the requirement to indicate the structure of what you're unpacking to the marshaling layer.

https://github.com/dgryski/dmrgo
https://github.com/jehiah/gomrjob

Damian

Dan Kortschak

unread,
May 25, 2013, 6:11:04 PM5/25/13
to Anh Hai Trinh, minux, Paulo Pinto, golang-nuts
Some time ago, before I knew about MR, I wrote a very lightweight thing that is really an MR implementation for small data[1] - this was one of those learning Go early projects. I could be used to build a distributed MR using the RPC functionality in the standard library.

Handling streaming data was not in the design, but adding this would not be onerous if a work queue were added to the front.

Dan

[1]http://godoc.org/code.google.com/p/biogo/concurrent#Map


On 24/05/2013, at 9:31 PM, "Anh Hai Trinh" <anh.ha...@gmail.com> wrote:

> Can Go do this?

Tobia

unread,
May 26, 2013, 8:01:48 PM5/26/13
to golan...@googlegroups.com, minux, Paulo Pinto
Hai-Anh Trinh wrote:
I will give two that cannot be "easily" done with Go. I'd love to be proven wrong here:
1. MapReduce.

Let me oblige you! There is no reason a generic framework cannot be written to make MapReduce easier.

Any generic computation framework will of course need to be based on reflection and interface{}, for the time being, but the lambdas or inner functions you pass to that framework can be defined (and compiled for) the native types. Therefore any "code ugliness" will be confined to the framework itself, not to its users, and any reflection overhead will be limited to the bookkeeping / dispatching code, not to the actual computations.

Here is a trivial concurrent "map" (in the functional sense) I threw together to prove my point. Look at the usage in main(). Notice how the invocation is straightforward to write and the inner function is defined / compiled for the specific type. The only hint the user has that there is some sort of reflection going on is the trailing type assertion (not a type conversion.)


Tobia

yahiah...@gmail.com

unread,
Nov 22, 2018, 1:34:52 PM11/22/18
to golang-nuts


در شنبه 25 مهٔ 2013، ساعت 0:14:52 (UTC+4:30)، Ziad Hatahet نوشته:

yahiah...@gmail.com

unread,
Nov 22, 2018, 3:32:51 PM11/22/18
to golang-nuts
Hi how are you can you help me I need distributed mapreduce code for golan
Thank you

Mandolyte

unread,
Nov 23, 2018, 8:49:29 AM11/23/18
to golang-nuts
Reply all
Reply to author
Forward
0 new messages