Here is my attempt to write mapreduce in perl
This assumes you have list of hosts(host1, host2, host3) and each has
a web-server. copy
rpc.pl to each of server under cgi-bin directory.
and then run
dgrep.pl. the contents of each file is given below.
===================
rpc.pl==================
#!/usr/bin/perl
use strict;
use Data::Dumper;
use CGI;
use Storable;
my $query = new CGI;
my $fn = $query->param('fn');
my $args = Storable::thaw($query->param('args'));
print $query->header({-type=>'text/plain'});
sub stub{
eval $fn;
}
#print "args ".Dumper($args);
print Storable::freeze(stub(@{$args}));
exit;
=======
==============
dgrep.pl ===============
#!/usr/bin/perl
use Data::Dumper;
use Mapreduce;
my $grep=$ARGV[0];
$r = mapreduce(fnserialize(sub {
eval 'use LWP::UserAgent;';
my ($em, $url, $val)=@_;
my $browser = LWP::UserAgent->new("agent", "Mozilla/
4.0 (compatible; urlget; MSIE 6.0; Windows NT 5.1; SV1)");
my $response = $browser->get("
http://host1/".$url);
if($response->is_success){
my $ct = $response->content;
my $ln = 1;
while($ct =~ m/(.*)\n/g){
my $l = $1;
if($l =~ m/$val/){
$em->("$url:$ln: ".$l, 1);
}
$ln++;
}
}
}),
fnserialize(sub {
my ($em, $key, $val) = @_;
$em->($key, 0);
}),
{"1.txt"=>$grep,"2.txt"=>$grep,"3.txt"=>$grep,"4.txt"=>
$grep,"5.txt"=>$grep}, 1);
map{print $_."\n"} keys(%{$r});
==============
================= MapReduce.pm ================
package Mapreduce;
use strict;
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(fnserialize mapreduce);
our $VERSION = 1.00;
use B::Deparse;
use LWP::UserAgent;
use URI::Escape;
use Storable;
sub fnserialize(&) {
my $fn = shift;
my $deparse = B::Deparse->new;
return $deparse->coderef2text($fn);
}
sub rpc(&@) {
my $fn = shift;
my $deparse = B::Deparse->new;
my $map = $deparse->coderef2text($fn);
my $browser = LWP::UserAgent->new("agent", "Mozilla/4.0
(compatible; mapreduce; MSIE 6.0; Windows NT 5.1; SV1)");
my $req = new HTTP::Request POST => "
http://host1/cgi-bin/
rpc.pl";
my $postdata =
'fn='.uri_escape($map)."&args=".uri_escape(Storable::freeze(\@_));
$req->content($postdata);
my $response = $browser->request($req);
if ($response->is_success){
my $content = $response->content;
if(length($content) > 0){
return Storable::thaw($content);
}
}
return undef;
}
sub mapreduce {
rpc {
my $map = shift;
my $reduce = shift;
my $data = shift;
my $noreduce = shift;
eval 'use B::Deparse; use LWP::Parallel::UserAgent;
use HTTP::Request; use URI::Escape; use Storable; use Data::Dumper;';
my @hostlist = ('
http://host1/cgi-bin/rpc.pl', 'http://
host2/cgi-bin/
rpc.pl', '
http://host3/cgi-bin/rpc.pl');
my $funser = sub {
my $fn = shift;
my $deparse = B::Deparse->new;
return $deparse->coderef2text($fn);
};
my $remcall = sub {
my $fn = shift;
return 'fn='.uri_escape($funser-
>($fn))."&args=".uri_escape(Storable::freeze(\@_));
};
my $remmap = sub{
my ($mapfn, $k, $v) = @_;
my %interhash=();
my $mapemit = sub {
my($key, $val) = @_;
push(@{$interhash{$key}}, $val);
};
my $evalstub = sub {
my $fn = shift;
eval $fn;
};
$evalstub->($mapfn, $mapemit, $k, $v);
return \%interhash;
};
my $remred = sub{
my ($redfn, $k, $v) = @_;
my %finalhash=();
my $redemit = sub {
my($key, $val) = @_;
$finalhash{$key} = $val;
};
my $evalstub = sub {
my $fn = shift;
eval $fn;
};
$evalstub->($redfn, $redemit, $k, $v);
return \%finalhash;
};
#map{$evalstub->($map, $emitintermediate, $_, $data-
>{$_});} keys %{$data};
my $host_ind = 0;
my $host_len = @hostlist;
my @reqs = map{if($host_ind >= $host_len){$host_ind =
0;} HTTP::Request->new('POST', $hostlist[$host_ind++], undef, $remcall-
>($remmap, $map, $_, $data->{$_}));} keys %{$data};
my $pua = LWP::Parallel::UserAgent->new;
$pua->agent("Mozilla/4.0 (compatible; map; MSIE 6.0;
Windows NT 5.1; SV1)");
foreach my $req (@reqs) {
$pua->register($req);
}
my $entries = $pua->wait();
my %maphash=();
foreach my $et (keys %$entries) {
my $res = $entries->{$et}->response;
if ($res->is_success){
my $content = $res->content;
my $ihash = Storable::thaw($content);
foreach my $ih (keys(%{$ihash})){
foreach my $mi (@{$ihash-
>{$ih}}){
push(@{$maphash{$ih}},
$mi);
}
}
}
}
if($noreduce){
return \%maphash;
}
$host_ind = 0;
@reqs = map{if($host_ind >= $host_len){$host_ind = 0;}
HTTP::Request->new('POST', $hostlist[$host_ind++], undef, $remcall-
>($remred, $reduce, $_, $maphash{$_}));} keys %maphash;
#map{my @vr = @{$maphash{$_}}; $evalstub->($reduce,
$emit, $_, \@vr);} keys(%maphash);
$pua = LWP::Parallel::UserAgent->new;
$pua->agent("Mozilla/4.0 (compatible; reduce; MSIE
6.0; Windows NT 5.1; SV1)");
my %redhash=();
foreach my $req (@reqs) {
$pua->register($req);
}
$entries = $pua->wait();
foreach my $et (keys %$entries) {
my $res = $entries->{$et}->response;
if ($res->is_success){
my $content = $res->content;
my $ihash = Storable::thaw($content);
map{$redhash{$_} = $ihash->{$_}} keys(%
{$ihash});
}
}
return \%redhash;
} @_;
}
1;
================