perl http mapreduce

2 views
Skip to first unread message

Nikhil S

unread,
Jul 17, 2008, 8:31:14 AM7/17/08
to httpmr-discuss
Here is my attempt to write mapreduce in perl

This assumes you have list of hosts(host1, host2, host3) and each has
a web-server. copy rpc.pl to each of server under cgi-bin directory.
and then run dgrep.pl. the contents of each file is given below.

===================rpc.pl==================
#!/usr/bin/perl

use strict;
use Data::Dumper;
use CGI;
use Storable;

my $query = new CGI;
my $fn = $query->param('fn');
my $args = Storable::thaw($query->param('args'));
print $query->header({-type=>'text/plain'});

sub stub{
eval $fn;
}
#print "args ".Dumper($args);
print Storable::freeze(stub(@{$args}));
exit;
=======

============== dgrep.pl ===============
#!/usr/bin/perl

use Data::Dumper;
use Mapreduce;

my $grep=$ARGV[0];
$r = mapreduce(fnserialize(sub {
eval 'use LWP::UserAgent;';
my ($em, $url, $val)=@_;
my $browser = LWP::UserAgent->new("agent", "Mozilla/
4.0 (compatible; urlget; MSIE 6.0; Windows NT 5.1; SV1)");
my $response = $browser->get("http://host1/".$url);
if($response->is_success){
my $ct = $response->content;
my $ln = 1;
while($ct =~ m/(.*)\n/g){
my $l = $1;
if($l =~ m/$val/){
$em->("$url:$ln: ".$l, 1);
}
$ln++;
}
}
}),
fnserialize(sub {
my ($em, $key, $val) = @_;
$em->($key, 0);
}),
{"1.txt"=>$grep,"2.txt"=>$grep,"3.txt"=>$grep,"4.txt"=>
$grep,"5.txt"=>$grep}, 1);

map{print $_."\n"} keys(%{$r});
==============
================= MapReduce.pm ================
package Mapreduce;

use strict;

require Exporter;

our @ISA = qw(Exporter);
our @EXPORT = qw(fnserialize mapreduce);
our $VERSION = 1.00;

use B::Deparse;
use LWP::UserAgent;
use URI::Escape;
use Storable;

sub fnserialize(&) {
my $fn = shift;
my $deparse = B::Deparse->new;
return $deparse->coderef2text($fn);
}
sub rpc(&@) {
my $fn = shift;
my $deparse = B::Deparse->new;
my $map = $deparse->coderef2text($fn);
my $browser = LWP::UserAgent->new("agent", "Mozilla/4.0
(compatible; mapreduce; MSIE 6.0; Windows NT 5.1; SV1)");
my $req = new HTTP::Request POST => "http://host1/cgi-bin/
rpc.pl";
my $postdata =
'fn='.uri_escape($map)."&args=".uri_escape(Storable::freeze(\@_));
$req->content($postdata);
my $response = $browser->request($req);
if ($response->is_success){
my $content = $response->content;
if(length($content) > 0){
return Storable::thaw($content);
}
}
return undef;
}

sub mapreduce {
rpc {
my $map = shift;
my $reduce = shift;
my $data = shift;
my $noreduce = shift;
eval 'use B::Deparse; use LWP::Parallel::UserAgent;
use HTTP::Request; use URI::Escape; use Storable; use Data::Dumper;';
my @hostlist = ('http://host1/cgi-bin/rpc.pl', 'http://
host2/cgi-bin/rpc.pl', 'http://host3/cgi-bin/rpc.pl');
my $funser = sub {
my $fn = shift;
my $deparse = B::Deparse->new;
return $deparse->coderef2text($fn);
};
my $remcall = sub {
my $fn = shift;
return 'fn='.uri_escape($funser-
>($fn))."&args=".uri_escape(Storable::freeze(\@_));
};
my $remmap = sub{
my ($mapfn, $k, $v) = @_;
my %interhash=();
my $mapemit = sub {
my($key, $val) = @_;
push(@{$interhash{$key}}, $val);
};
my $evalstub = sub {
my $fn = shift;
eval $fn;
};
$evalstub->($mapfn, $mapemit, $k, $v);
return \%interhash;
};
my $remred = sub{
my ($redfn, $k, $v) = @_;
my %finalhash=();
my $redemit = sub {
my($key, $val) = @_;
$finalhash{$key} = $val;
};
my $evalstub = sub {
my $fn = shift;
eval $fn;
};
$evalstub->($redfn, $redemit, $k, $v);
return \%finalhash;
};

#map{$evalstub->($map, $emitintermediate, $_, $data-
>{$_});} keys %{$data};
my $host_ind = 0;
my $host_len = @hostlist;
my @reqs = map{if($host_ind >= $host_len){$host_ind =
0;} HTTP::Request->new('POST', $hostlist[$host_ind++], undef, $remcall-
>($remmap, $map, $_, $data->{$_}));} keys %{$data};
my $pua = LWP::Parallel::UserAgent->new;
$pua->agent("Mozilla/4.0 (compatible; map; MSIE 6.0;
Windows NT 5.1; SV1)");
foreach my $req (@reqs) {
$pua->register($req);
}
my $entries = $pua->wait();
my %maphash=();
foreach my $et (keys %$entries) {
my $res = $entries->{$et}->response;
if ($res->is_success){
my $content = $res->content;
my $ihash = Storable::thaw($content);
foreach my $ih (keys(%{$ihash})){
foreach my $mi (@{$ihash-
>{$ih}}){
push(@{$maphash{$ih}},
$mi);
}
}
}
}
if($noreduce){
return \%maphash;
}
$host_ind = 0;
@reqs = map{if($host_ind >= $host_len){$host_ind = 0;}
HTTP::Request->new('POST', $hostlist[$host_ind++], undef, $remcall-
>($remred, $reduce, $_, $maphash{$_}));} keys %maphash;
#map{my @vr = @{$maphash{$_}}; $evalstub->($reduce,
$emit, $_, \@vr);} keys(%maphash);
$pua = LWP::Parallel::UserAgent->new;
$pua->agent("Mozilla/4.0 (compatible; reduce; MSIE
6.0; Windows NT 5.1; SV1)");
my %redhash=();
foreach my $req (@reqs) {
$pua->register($req);
}
$entries = $pua->wait();
foreach my $et (keys %$entries) {
my $res = $entries->{$et}->response;
if ($res->is_success){
my $content = $res->content;
my $ihash = Storable::thaw($content);
map{$redhash{$_} = $ihash->{$_}} keys(%
{$ihash});
}
}
return \%redhash;
} @_;
}

1;
================

Peter Dolan

unread,
Jul 17, 2008, 2:06:38 PM7/17/08
to httpmr-...@googlegroups.com
Pretty neat, it's true what they say about Perl and its amazing readability properties.

I don't pretend I followed all the details, but it looks to me like you're executing everything in a single request.  Is that right?

Is this code available somewhere in a repository, and what's the license?  Would you be interested in writing a parallel Perl implementation of HTTPMR?  At first glance, I think it would involve some work on factoring things out to rely on interfaces (so that storage systems can be dropped in as needed) and some work on sharding the processing across requests.

Thanks!
- Peter

Nikhil S

unread,
Jul 17, 2008, 2:56:00 PM7/17/08
to httpmr-discuss
rpc.pl is a script that takes two arguments one is "fn" which is the
code of the function to be executed and other is "args" which is the
serialized arguments list. then it executes this code using "eval".
the reason for this is you just have to distribute rpc.pl on all the
machines on the cluster, and the mapping/reducing function need not be
distributed to all the machines. i just pass the code of those
functions using HTTP. perl allows to de-parse a function (B:Deparse is
the module).

it does do parallel HTTP requests to rpc.pl on various machines. CPAN
has a module Parallel:UserAgent which does parallel HTTP requests.
i will send those files in a separate mail as attachments.

This can be enhanced to use memcache/Amazon's simpledb to store
intermediate key-value pairs.

Just for my info. Is there a way to de-parse a function in python (i
dont know much of python) ?. i mean to get the code in string form.

thanks and regards
nikhil
> > rpc.pl <http://host1/cgi-bin/rpc.pl>";
> > my $postdata =
> > 'fn='.uri_escape($map)."&args=".uri_escape(Storable::freeze(\@_));
> > $req->content($postdata);
> > my $response = $browser->request($req);
> > if ($response->is_success){
> > my $content = $response->content;
> > if(length($content) > 0){
> > return Storable::thaw($content);
> > }
> > }
> > return undef;
> > }
>
> > sub mapreduce {
> > rpc {
> > my $map = shift;
> > my $reduce = shift;
> > my $data = shift;
> > my $noreduce = shift;
> > eval 'use B::Deparse; use LWP::Parallel::UserAgent;
> > use HTTP::Request; use URI::Escape; use Storable; use Data::Dumper;';
> > my @hostlist = ('http://host1/cgi-bin/rpc.pl', 'http://
> > host2/cgi-bin/rpc.pl', 'http://host3/cgi-bin/rpc.pl'<http://host3/cgi-bin/rpc.pl%27>

Peter Dolan

unread,
Jul 17, 2008, 7:06:43 PM7/17/08
to httpmr-...@googlegroups.com
rpc.pl is a script that takes two arguments one is "fn" which is the
code of the function to be executed and other is "args" which is the
serialized arguments list. then it executes this code using "eval".
the reason for this is you just have to distribute rpc.pl on all the
machines on the cluster, and the mapping/reducing function need not be
distributed to all the machines. i just pass the code of those
functions using HTTP. perl allows to de-parse a function (B:Deparse is
the module).

it does do parallel HTTP requests to rpc.pl on various machines. CPAN
has a module Parallel:UserAgent which does parallel HTTP requests.
i will send those files in a separate mail as attachments.

Neat, I'll try to wade through the code a little more; Perl's something I've never worked with.
 

This can be enhanced to use memcache/Amazon's simpledb to store
intermediate key-value pairs.

I've considered writing in support for the AppEngine's memcache API, but decided against it for a couple reasons:
  • Memcache has no support for scanning semantics, only get(key) and put(key, value), so you need to know the keys you're going to be reducing before you can reduce them.  Doing that would require the mappers to somehow register the set of keys that had been output.  It's possible that you could do so by shoving the keys into memcache, and a few implementations might work:
    1. One memcache key, all mappers do a get-update-put on the value, which is a list of keys
    2. One memcache key per mapper output key, an incrementing integer.  Next integer to use is maintained in a special memcache key, and mappers request that next key every time they want to output a value.  Doesn't solve duplicate keys, and sharding the reducers has to get into modulo-arithmetic (reducer 5 of 20 takes every (20x + 5)th key), and that kinda makes my head hurt.
    3. Variant of (1), but with each mapper getting its own key to which to write.  Better concurrency, but doesn't handle uniquness and would thus require a second pass.
  • AppEngine's memcache API doesn't give you any guarantees about how much data you can put into it, so you face the possibility that your reducers won't get all of the mapper output.  I'm not familiar with Amazon's memcache, but I expect it's gonna have similar problems -- you'll be limited in how much intermediate data you can get away with.
Even though it could be feasible for people with small jobs, the difficulty of the first bullet point makes me hesistate to try implementing it at the expense of other work that needs to be done.


Just for my info. Is there a way to de-parse a function in python (i
dont know much of python) ?. i mean to get the code in string form.

Yep, Python can do that: eval("1 + 1") == 2.  See http://docs.python.org/lib/built-in-funcs.html.

- Peter

Nikhil S

unread,
Jul 18, 2008, 5:42:22 AM7/18/08
to httpmr-discuss
i have uploaded the files under this group's files section
http://groups.google.com/group/httpmr-discuss/files

thanks and regards
nikhil

On Jul 18, 4:06 am, "Peter Dolan" <peterjdo...@gmail.com> wrote:
> > rpc.pl is a script that takes two arguments one is "fn" which is the
> > code of the function to be executed and other is "args" which is the
> > serialized arguments list. then it executes this code using "eval".
> > the reason for this is you just have to distribute rpc.pl on all the
> > machines on the cluster, and the mapping/reducing function need not be
> > distributed to all the machines. i just pass the code of those
> > functions using HTTP. perl allows to de-parse a function (B:Deparse is
> > the module).
>
> > it does do parallel HTTP requests to rpc.pl on various machines. CPAN
> > has a module Parallel:UserAgent which does parallel HTTP requests.
> > i will send those files in a separate mail as attachments.
>
> Neat, I'll try to wade through the code a little more; Perl's something I've
> never worked with.
>
>
>
> > This can be enhanced to use memcache/Amazon's simpledb to store
> > intermediate key-value pairs.
>
> I've considered writing in support for the AppEngine's memcache API, but
> decided against it for a couple reasons:
>
> - Memcache has no support for scanning semantics, only get(key) and
> put(key, value), so you need to know the keys you're going to be reducing
> before you can reduce them. Doing that would require the mappers to somehow
> register the set of keys that had been output. It's possible that you could
> do so by shoving the keys into memcache, and a few implementations might
> work:
> 1. One memcache key, all mappers do a get-update-put on the value,
> which is a list of keys
> 2. One memcache key per mapper output key, an incrementing integer.
> Next integer to use is maintained in a special memcache key, and mappers
> request that next key every time they want to output a value.
> Doesn't solve
> duplicate keys, and sharding the reducers has to get into
> modulo-arithmetic
> (reducer 5 of 20 takes every (20x + 5)th key), and that kinda
> makes my head
> hurt.
> 3. Variant of (1), but with each mapper getting its own key to which
> to write. Better concurrency, but doesn't handle uniquness and
> would thus
> require a second pass.
> - AppEngine's memcache API doesn't give you any guarantees about how much

Nikhil S

unread,
Jul 18, 2008, 6:27:35 AM7/18/08
to httpmr-discuss
you can get ParallelUserAgent from http://search.cpan.org/~marclang/ParallelUserAgent-2.57/
it is not installed by default.

to try this out, copy rpc.pl on all the machines under cgi-bin
directory.
dgrep.pl and MapReduce.pm should be on same directory on a client
machine (can be laptop also). then do ./dgrep.pl <patterns> <list-of-
urls>

thanks and regards
nikhil

On Jul 18, 2:42 pm, Nikhil S <nik...@gmail.com> wrote:
> i have uploaded the files under this group's files sectionhttp://groups.google.com/group/httpmr-discuss/files
Reply all
Reply to author
Forward
0 new messages