loading writing to/from cassandra

40 views
Skip to first unread message

Roland Gude

unread,
Jan 3, 2012, 6:11:38 AM1/3/12
to peregrine...@googlegroups.com
Hi,

i saw the announcement on the cassandra users list and am very interested in this framework but unfortunately i could not find any pointers on how to read/write directly from cassandra (or any other system then the fs). If you would be so kind to provide some help here, i will gladly evaluate peregerine for our use cases.

greetings,

roland

Kevin Burton

unread,
Jan 3, 2012, 4:49:19 PM1/3/12
to peregrine...@googlegroups.com
Hey Roland.

This is a good question.

Actually my data source *IS* Cassandra so I'm motivated to get this resolved next.

I have a bug for it here:


The theory is to have loaders/storers similar to pig but with some changes.

I'm modeling Peregrine after Extract / Transform / Load jobs.  

So I want to have peregrine do the 'Transform' stage and have a driver that provides the extract and load stages of Peregrine.

The driver would register URI handlers to Cassandra so that you specify an input source like:

cassandra:my data instead of a file stored in PFS which would normally be file:/path/to/file.dat

It should probably have semantics similar to Pig for loading and storing.  We should just borrow their mechanism for specifying authentication information, etc.

One key difference is that I don't want to write to the filesystem during this operation.  

The way Pig/Hadoop do it is that they first write to the filesystem and then read the data off disk.  This is too expensive considering you previously actually read the data directly from the source.

Instead we are going to map directly over the data if as it is being read.

This is a bit tricky though and has a number of issues we need to be careful with:

- We need some sort of mechanism to resume in drivers (if they support it).  For some systems this could be just keeping a pointer to the data as we are moving through it but of course some other systems might not have support for this.

- If the system supports snapshots we might want to use this so that we can get one consistent copy of the data.

- If we replay the map again we may end up writing duplicate data if there is no 'resume' in the driver.  The reduce job would need to be aware of removing duplicate data… (only accept the first record, remove duplicate records, etc)..  Though in some situations this might not be possible.  

I think I could bang out a first round of a driver pretty quickly actually. I'll work on it now.

I can get a demo working by building a URI register mechanism and then have it install a driver that supports the ChunkReader interface… 

--
--

Founder/CEO Spinn3r.com

Location: San Francisco, CA
Skype: burtonator

Skype-in: (415) 871-0687


Roland Gude

unread,
Jan 4, 2012, 10:10:27 AM1/4/12
to peregrine...@googlegroups.com
ok,

you realy lost me in the details but the example of a extract/load driver would be great
furthermore i would appreciate a little conceptual insight into this driver mechanism as you intend it (althogh javadocs etc for the example might be enough)

if it seems suitable for our needs, maybe we might dedicate some developer time towards peregerine.

Kevin Burton

unread,
Jan 4, 2012, 3:48:41 PM1/4/12
to peregrine...@googlegroups.com
On Wed, Jan 4, 2012 at 7:10 AM, Roland Gude <r...@ndgu.de> wrote:
ok,

you realy lost me in the details but the example of a extract/load driver would be great

Great… I'm working on one now that just 'Extracts' a static dataset and then Loads it back into a hash table… 

Then writing others for different backends should be easy and straight forward with some example code.
 
furthermore i would appreciate a little conceptual insight into this driver mechanism as you intend it (althogh javadocs etc for the example might be enough)

Well basically there is a ChunkReader interface and a ChunkWriter interface.



And this is just a plugin mechanism so we can map from something like:

cassanda:foo 

to a ChunkReader for doing an Extract and then when we're done our reduce and want to write the data we use a ChunkWriter to put it back in… 
 
Does  that make sense?


if it seems suitable for our needs, maybe we might dedicate some developer time towards peregerine.


What are your needs?  What are you building?

My initial thinking is that for every 1000 node cluster there are 100, 100 node clusters… 

So I'm roughly trying to make the system faster for smaller clusters right now.

Iterative and pipelined jobs should also be blazingly fast …

Kevin

Roland Gude

unread,
Jan 5, 2012, 8:27:50 AM1/5/12
to peregrine...@googlegroups.com
currently our clusters are rather small, so ptimizing for small systems would be a good thing.

We are working on a recommender system and are researching the benefits and pitfalls of map/reduce and different map/reduce frameworks for us.
hadoop for instance (and mahout) are pretty close to what we need but way to complex to setup, manage and maintain while beeing optimized for realy large compute clusters.

currently we do a lot of index queries to cassandra - iterating over subsets of the stored data - to build different models (a model describes the relationships between different items).
A first go would be to iterate over the complete local data and map the entries to the correct subsets. this should already provide a serious computation speedup and eliminate several bottlenecks. And we would work our way up from there towards a full fledged map/reduce logic. 

So key requirements for us are
  • cassandra integration (best with data locality)
  • ease of operations (setup/maintain map/reduce cluster)
  • elimination of single points of failure
hadoop has a cassandra integration with data locality but cannot deliver on the other requirements
cloud-map-reduce has no single-point-of-failure but lacks cassandra integration


Kevin Burton

unread,
Jan 5, 2012, 3:48:14 PM1/5/12
to peregrine...@googlegroups.com
On Thu, Jan 5, 2012 at 5:27 AM, Roland Gude <r...@ndgu.de> wrote:
currently our clusters are rather small, so ptimizing for small systems would be a good thing.


How many machines?  'small' is relative if you're used to 4000 node clusters :-P
 
We are working on a recommender system and are researching the benefits and pitfalls of map/reduce and different map/reduce frameworks for us.
hadoop for instance (and mahout) are pretty close to what we need but way to complex to setup, manage and maintain while beeing optimized for realy large compute clusters.


Yes… and Hadoop is missing a number of optimizations for both smaller installations and to be optimum per-node performance (mmap, fallocate, etc)

 
A first go would be to iterate over the complete local data and map the entries to the correct subsets. this should already provide a serious computation speedup and eliminate several bottlenecks.

So your storing the Cassandra locally to the same machines that you want to run your reduces over?

This should probably be ok if you Cassandra boxes were not doing writes will the job is running and you had sufficient memory.

Another option is to buy a set of physical disks for Peregrine.

It runs on the assumption that you're not doing multi tenancy on the disks as IO performance can quickly plummet when doing lots of random seeks.
 
Another alternative if you are running MR about 30% of the time is to just have dedicated hardware.  At least in our situation running on dedicated hardware at Softlayer (vs Amazon) is actually cheaper due to Amazon being so expensive.

And we would work our way up from there towards a full fledged map/reduce logic. 


hm….. so I imagine your jobs are more like map only jobs for now?
 
So key requirements for us are
  • cassandra integration (best with data locality)

I released 0.5.1 yesterday which has an example IODriver implementation which should work for Cassandra but I haven't yet written the implementation.

Which Cassandra driver do you use?  

I need to figure how how we're going to map the Cassandra schema to key/value pairs because there are arbitrarily ways to do this… 
  • ease of operations (setup/maintain map/reduce cluster)

Yeah… Peregrine should nail that.  Right now the system is very straight forward in terms of operations and ease of use.

There is just a pfsd node that you run and the controller (which schedules and runs jobs) is just embedded in the code that you launch to run your job.
 
There is also the peregrine.conf file which is pretty straight forward...
  • elimination of single points of failure

The controller is the only SPoF right now… the PFS nodes all work with multiple replicas but failure right now isn't implemented.

Since we are initially designing for smaller clusters having a job fail occasionally is ok if you just restart it… 

ALL the design for handling failure is done but I wanted to get feature complete first as I think having this system in production makes it easy to justify investing in continued development.
 
hadoop has a cassandra integration with data locality but cannot deliver on the other requirements
cloud-map-reduce has no single-point-of-failure but lacks cassandra integration


Ah… I should look at how pig handles mapping the Cassandra schema to Pig schema because we have a similar problem.

We need to take the Cassandra schema layout and map it to key/value pairs for Peregrine. 

Kevin Burton

unread,
Jan 5, 2012, 4:00:10 PM1/5/12
to peregrine...@googlegroups.com
I just looked at the Hadoop+Cassandra support… it may be possible to just take the Cassandra + Hadoop code and just wrap as Peregrine has an analogous interface.

If there aren't any weird Hadoop dependencies it should be straight forward.

Also, it may be a way to easily add support for other systems….

Kevin

On Thu, Jan 5, 2012 at 12:48 PM, Kevin Burton <burto...@gmail.com> wrote:


On Thu, Jan 5, 2012 at 5:27 AM, Roland Gude <r...@ndgu.de> wrote:
currently our clusters are rather small, so ptimizing for small systems would be a good thing.


How many machines?  'small' is relative if you're used to 4000 node clusters :-P
 
We are working on a recommender system and are researching the benefits and pitfalls of map/reduce and different map/reduce frameworks for us.
hadoop for instance (and mahout) are pretty close to what we need but way to complex to setup, manage and maintain while beeing optimized for realy large compute clusters.


Yes… and Hadoop is missing a number of optimizations for both smaller installations and to be optimum per-node performance (mmap, fallocate, etc)


Brian O'Neill

unread,
Jan 5, 2012, 4:05:34 PM1/5/12
to peregrine...@googlegroups.com

We use Cassandra+Hadoop everywhere.  Let me know if you run into any trouble.

If you want to see a simple use of it, you can checkout this class:

That job races over a column family, executes a ruby script against the data (map invocation per row), then writes the result to a new column family.

-brian

---- 
Brian O'Neill
Lead Architect, Software Development
Health Market Science | 2700 Horizon Drive | King of Prussia, PA 19406
p: 215.588.6024


burtonator

unread,
Jan 5, 2012, 4:46:29 PM1/5/12
to peregrine...@googlegroups.com
Interesting that both examples used Thrift directly... I imagine this is the best way to go to avoid the driver selection issue.

Further if it is already done it's pretty easy to just adapt the code to Peregrine since our interface is pretty small.

BTW here is the driver interface for writing these things:



... pretty tight so should be easy to implement support for Cassandra, etc.

Brian O'Neill

unread,
Jan 5, 2012, 5:01:13 PM1/5/12
to peregrine...@googlegroups.com
Also, it might be worth noting that DataStax has a version of Cassandra in DataStax Enterprise that embeds the Hadoop runtime into the Cassanda JVM,.  It can then take advantage of data locality divvying up the jobs appropriately so input data need not traverse the network.  

When using the Thrift client directly and the "out-of-the-box" Hadoop Support in Cassandra, I don't believe you get the data locality, which means you could be (and most likely are) slurping data over the network. (could be expensive)

Of course this only matters if you are co-locating the analytics processes (Hadoop) with the data (Cassandra).

-brian
 
--
Brian ONeill
Lead Architect, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024

Kevin Burton

unread,
Jan 5, 2012, 7:23:12 PM1/5/12
to peregrine...@googlegroups.com
On Thu, Jan 5, 2012 at 2:01 PM, Brian O'Neill <bo...@alumni.brown.edu> wrote:
Also, it might be worth noting that DataStax has a version of Cassandra in DataStax Enterprise that embeds the Hadoop runtime into the Cassanda JVM,.  It can then take advantage of data locality divvying up the jobs appropriately so input data need not traverse the network.  


Funny that I was thinking the same thing but I think the complexity might not be worth the performance advantage.

It's nice to have zero copy but this can't really be THAT much of a bottleneck… maybe 10-15% CPU and if you're using loopback to communicate to the second JVM you're probably just fine.
 
When using the Thrift client directly and the "out-of-the-box" Hadoop Support in Cassandra, I don't believe you get the data locality, which means you could be (and most likely are) slurping data over the network. (could be expensive)


Yes… I believe you're just going to copy it over the network… I think the trick is to make sure that the Peregrine key ranges and the Cassandra key ranges are identical so if you're hosting Peregrine and Cassandra on the same box then you're not moving data over the network but are just using the loopback adapter.
 
Of course this only matters if you are co-locating the analytics processes (Hadoop) with the data (Cassandra).

I wonder what the typical configuration is…

I was thinking of creating a poll and posting it to the Hadoop list to figure out what the typical cluster size looks like.

Kevin

-- 

Roland Gude

unread,
Jan 6, 2012, 9:41:18 AM1/6/12
to peregrine...@googlegroups.com
ok this looks very promising

meanwhile i was able to run a examples and port one simple algorithm of ours to peregrine (with random data). using one node it already works but i still have issues when using more nodes (shuffle input not available). should keys always be hashcodes to make sure that every node gets part of the data?

Kevin Burton

unread,
Jan 6, 2012, 3:26:37 PM1/6/12
to peregrine...@googlegroups.com
On Fri, Jan 6, 2012 at 6:41 AM, Roland Gude <r...@ndgu.de> wrote:
ok this looks very promising

meanwhile i was able to run a examples and port one simple algorithm of ours to peregrine (with random data).

Nice!

I also designed it for easy of development so you can embed Peregrine inside your local environment and iterate quickly…  
 
using one node it already works but i still have issues when using more nodes (shuffle input not available). should keys always be hashcodes to make sure that every node gets part of the data?

Yes… for now your keys should be hash codes so that every node gets some of the data.  

What exception did you get I think I need to init an empty shuffle in this case.  I hit this bug the other day so I will write a test for it now.

You have unbalanced data at this point which isn't a good thing to happen but this is a bad failure condition.

Using a hash code should fix it… and all you data will be evenly distributed.

I also need to implement a sampler so that you can have custom range routing.  Sorting by rank for example is something I want to implement but the key distribution will need to be sampled so that the ranges evenly distribute the data across the cluster.

But both are somewhat easy… 

Kevin

--

burtonator

unread,
Jan 6, 2012, 6:25:31 PM1/6/12
to peregrine...@googlegroups.com
I have a fix for this bug in hg now and I'll push it in the next release... 

I need to figure out the right way to handle reading from cassandra.

The easiest way is to read all fields directly.  However, I can see people not needing all the fields in the export but doing this filtering could itself be a very complex task requiring a turing complete language.

Perhaps the solution here would be to extend the basic cassandra data source and then add your own logic.

burtonator

unread,
Jan 6, 2012, 10:08:04 PM1/6/12
to peregrine...@googlegroups.com
I spent some more time looking at this and I think our best shot is probably to fork this:


There are some reasons why I think it makes sense to fork it...

1.  There are some hadoop dependencies.  I don't think it makes sense to embed hadoop as a dependency just to reuse one of their adapters.  It's mostly around configuration directives.

2.  We're going to want to modify the code so that Cassandra key ranges in theory line up with peregrine key ranges so that if you want to run Peregrine and Cassandra on the same box it's basically a local reduce....

Though I wonder how many people would want to do #2... I keep going back and forth on whether this is a good idea.  

I guess at the end of the the day though the code could probably be pruned down to maybe 100-200 lines of code which isn't really the end of the world.

Roland Gude

unread,
Jan 8, 2012, 4:28:59 PM1/8/12
to peregrine...@googlegroups.com
wow great, i'll pull the changes and try it out first thing on monday.

i am a bit puzzled though on one thing.

the algorithm i have ported - a simple popularity count - needs a last reduce step, wehre all data goes to the same reducer (i.e. key is 0) (actually i am not sure this is required but i could not think of another way. i just need to sort the output from the other reducers. probably possible in parallel - but how?).

on my last test this lead to Exceptions (i think it was shuffle data does not exist and sometimes nullpointer exceptions) on some nodes, while producing the correct output on the responsible node. still job output lead me to believe my job failed.

I will reproduce the exceptions and retry with the new version maybe this is already resolved.

The class you are referring to is exactly the one i meant when i talked about cassandra/hadoop integration.
i think forking that one is probably a good idea

---

I don't think that a turing complete language is needed for the filtering. you just need a way to provide the desired range and the desired slice, or a the slice for a given rowkey, so basically just one interface a bit like this

public boolean isKeyInRange(ByteBuffer key);
public SliceRange getSliceRange(ByteBuffer key);
or
public boolean isNameInSliceRange(ByteBuffer key, ByteBuffer name);



Roland Gude

unread,
Jan 8, 2012, 5:40:54 PM1/8/12
to peregrine...@googlegroups.com
btw. i was wondering why the partitioner does not apply the "hashcode" stuff to the key (like randompartitioner in cassandra) so that a key could still contain information for the computation.

Kevin Burton

unread,
Jan 8, 2012, 6:32:44 PM1/8/12
to peregrine...@googlegroups.com
I think there are two ways to do it… 

One is to have the key AS the hash code… 

The other is to compute the hash code from the key.

However, if the key is ALREADY a hash code it's a waste of CPU to hash it again.

Peregrine used to have a keyIsHashcode() method to figure this out… 

I might add it back in as a job conf parameter so that you could do it either way.

But for our initial use case (page rank) having the keys as hash codes is the default operation so I just went down that path.


On Sun, Jan 8, 2012 at 2:40 PM, Roland Gude <r...@ndgu.de> wrote:
btw. i was wondering why the partitioner does not apply the "hashcode" stuff to the key (like randompartitioner in cassandra) so that a key could still contain information for the computation.



Kevin Burton

unread,
Jan 8, 2012, 6:43:07 PM1/8/12
to peregrine...@googlegroups.com
On Sun, Jan 8, 2012 at 1:28 PM, Roland Gude <r...@ndgu.de> wrote:
wow great, i'll pull the changes and try it out first thing on monday.


Sweet… I'm going to work a big more on it today. 

 
the algorithm i have ported - a simple popularity count - needs a last reduce step, wehre all data goes to the same reducer (i.e. key is 0) (actually i am not sure this is required but i could not think of another way. i just need to sort the output from the other reducers. probably possible in parallel - but how?).


Oh… so or example you want to take vales from ALL the reducers and send them to a sort of global value?

Look at broadcast values in the page rank example (there is also documentation in /design) for these.

If they aren't clear I'll write up some more documentation and examples.

Basically, it's a way to broadcast a global value to the next iteration (or as a final step).

This could be used to perform a computation against EVERY value in the entire cluster and then broadcast these results so that every partition can read the value.

Usually this is ONE record… 

In PR we do this to count the total number of unique nodes in the graph as nr_nodes.

Then we broadcast this to the entire cluster so that nr_nodes is then a variable that every partition can read when it's doing a map or reduce.

Does that make sense?

on my last test this lead to Exceptions (i think it was shuffle data does not exist and sometimes nullpointer exceptions) on some nodes, while producing the correct output on the responsible node. still job output lead me to believe my job failed.


Hm.. BTW… I have a fix for this.  I will try to do another push tonight so that you get those changes.. 
 
I will reproduce the exceptions and retry with the new version maybe this is already resolved.


OK.. if you could post your code that could help too...
 
The class you are referring to is exactly the one i meant when i talked about cassandra/hadoop integration.
i think forking that one is probably a good idea


Cool.  That's what I was thinking too.

I think there will be more work needed to align the cassandra and peregrine partitions.

In some situations I could see that the cassandra and peregrine partitions don't line up exactly so you still want to get as much of the peregrine data aligned with as much cassandra data as possible.

There is ALSO the issue of data locality.  Even though it's on a different machine you probably want to pull it from a machine on the SAME rack.

Peregrine doesn't yet have rack awareness but it's because we're targeting smaller clusters AND I think theres more work I want to do in this area as I don't think Hadoop is doing things correctly.
 
I don't think that a turing complete language is needed for the filtering. you just need a way to provide the desired range and the desired slice, or a the slice for a given rowkey, so basically just one interface a bit like this

public boolean isKeyInRange(ByteBuffer key);
public SliceRange getSliceRange(ByteBuffer key);
or
public boolean isNameInSliceRange(ByteBuffer key, ByteBuffer name);


The other trick is to get the 'splits' to line up so that you are reading from one key range on the right host so that if you're using an ordered partitioner that you just line these up directly.

I might bang out a rough version just to get it working and then we can worry about efficiency once that is up …

I also realized that we can actually do merge() joins DIRECTLY against Cassandra if they are using the ordered partitioner.

The interface just has to have a ChunkReader with key/value pairs that are already sorted.  

That would be cool because you could have a static input stored in peregrine already and then merge join directly against cassandra instead of having to read the data in and then re-sort it with another map/reduce.

Kevin

Roland Gude

unread,
Jan 9, 2012, 4:43:46 AM1/9/12
to peregrine...@googlegroups.com
ok the iritating errors are gone with the changes,

but i still get NUllPointerExceptions if i do a reduce where the input is a shuffle and the output is another shuffle (i.e. new ReducerX(new Input("shuffle:step1"),new Output("shuffle:step2")) this will gve me a NPE

2012-01-09 10:42:55,261  ERROR [main] peregrine.task.Scheduler Failed to handle task: rahn:11112:partition:00000002
 java.io.IOException: Unable to merge chunks: [file: null, length (in bytes): 2,058, size: 7]
        at peregrine.reduce.merger.ChunkMerger.merge(ChunkMerger.java:160)
        at peregrine.reduce.merger.ChunkMerger.merge(ChunkMerger.java:110)
        at peregrine.reduce.ReduceRunner.finalMerge(ReduceRunner.java:165)
        at peregrine.reduce.ReduceRunner.sort(ReduceRunner.java:98)
        at peregrine.task.ReducerTask.doCall(ReducerTask.java:88)
        at peregrine.task.BaseTask.call(BaseTask.java:156)
        at peregrine.task.ReducerTask.call(ReducerTask.java:58)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException: Reduce failed:
        at peregrine.task.ReducerTask$ReducerTaskSortListener.onFinalValue(ReducerTask.java:107)
        at peregrine.reduce.SortResult.emit(SortResult.java:80)
        at peregrine.reduce.SortResult.accept(SortResult.java:57)
        at peregrine.reduce.merger.ChunkMerger.merge(ChunkMerger.java:142)
        ... 11 more
Caused by: java.lang.NullPointerException
        at peregrine.io.driver.shuffle.ShuffleJobOutputDirect.emit(ShuffleJobOutputDirect.java:55)
        at peregrine.io.driver.shuffle.ShuffleJobOutputDirect.emit(ShuffleJobOutputDirect.java:47)
        at peregrine.io.driver.shuffle.ShuffleJobOutput.emit(ShuffleJobOutput.java:69)
        at peregrine.task.BaseJobDelegate.emit(BaseJobDelegate.java:42)
        at peregrine.app.popular.Popular$Reduce.reduce(Popular.java:99)
        at peregrine.task.ReducerTask$ReducerTaskSortListener.onFinalValue(ReducerTask.java:103)
        ... 14 more


same code works if output is a file

Kevin Burton

unread,
Jan 9, 2012, 3:14:22 PM1/9/12
to peregrine...@googlegroups.com
On Mon, Jan 9, 2012 at 1:43 AM, Roland Gude <r...@ndgu.de> wrote:
ok the iritating errors are gone with the changes,


Ah… are you running from an hg build?
 
but i still get NUllPointerExceptions if i do a reduce where the input is a shuffle and the output is another shuffle (i.e. new ReducerX(new Input("shuffle:step1"),new Output("shuffle:step2")) this will gve me a NPE

same code works if output is a file


Ah.. we really don't support ReduceReduce right now...

I guess the issue is that during your Reduce you're writing keys that you then need to shuffle?

So your flow looks like:

map emitting to shuffle:step1:

  0:1
  1:2 
  2:3
  3:4
   
so then what happens is that your shuffle data looks like: 

  partition0: 0:1, 2:3
  partition1: 1:2, 3:4
  
And then the emit key isn't destined for the same partition so you don't want to emit to a file but a shuffle.

I think that this is redundant isn't it?  Couldn't you in theory reduce over a file which has the original values?

This actually might be a good feature to add.. Take an input file that is key/value pair and reduce it directly (if it's already sorted and probably will be) and then run the supplied reduce function.

What you mention IS possible but one of the problems is trying to figure out how to size the shuffle buffer and sort buffers.

They actually need to be 1/2 the size … I'm planning on adding this feature soon as we need support ReduceMap … which runs a map() function directly on the output of a shuffle.  

There's also the issue of what do if MANY shuffle outputs are specified in a map…  They might not be the same size so you want to balance the shuffle buffer accordingly but there's no solid way to know this a-priori without sampling.  Maybe some type of system where we look at the last 5 minutes of data and balance accordingly.

Kevin

Roland Gude

unread,
Jan 9, 2012, 4:15:36 PM1/9/12
to peregrine...@googlegroups.com
yes i am running fro hg.

Wow... i did not know i was tripping over a conceptual error.

I think, I currently do not yet understand the concept of shuffle (i actually have no thorough knowledge about the whole map and reduce thingy, but i am just picking it up).
but basically what i get now is that shuffle means to divide the data between the nodes (a bit like MPI scatter) while writing to a file means "stay on  the node where you are".

Kevin Burton

unread,
Jan 9, 2012, 4:46:13 PM1/9/12
to peregrine...@googlegroups.com
On Mon, Jan 9, 2012 at 1:15 PM, Roland Gude <r...@ndgu.de> wrote:
yes i am running fro hg.


Cool… 
 
Wow... i did not know i was tripping over a conceptual error.

I think, I currently do not yet understand the concept of shuffle (i actually have no thorough knowledge about the whole map and reduce thingy, but i am just picking it up).
but basically what i get now is that shuffle means to divide the data between the nodes (a bit like MPI scatter) while writing to a file means "stay on  the node where you are".

Well the idea is that you have an output key which isn't on this given partition.. so you then move it to the master responsible for this partition and then to the reduce there.

In Peregrine writing to a file does mean that the output value is specific to that partition.

So basically if that key you emit is NOT specific to that partition you have to shuffle it… but if it IS specific to that partition you need to write it to a file.

Map only jobs could write to a file for example if the key is on the same partition.  Other than that they should write to a shuffle.

Reduce jobs can write to a file… 

I'm going to also add the concept of pipes soon so that you can Reduce to a pipe which will be faster when you don't care about the intermediate data vanishing.

Kevin

--

Roland Gude

unread,
Jan 10, 2012, 2:54:17 AM1/10/12
to peregrine...@googlegroups.com
ok, got it

pipes would be grat i guess
Reply all
Reply to author
Forward
0 new messages