Hello
I'm running into all sorts of errors seeking to map this out:
First I've got MemoryError, then I stopped yielding key, value as advised, and instead I'm only yielding key:
mykey = '::'.join([isp,code,time])
yield mykey, 1
Then the map phase reaches 10%, but then some jobs start getting killed with:
izip argument #1 must support iteration
Until the whole job gets killed.
So I went and removed time from mykey and did:
mykey = '::'.join([isp,code])
yield mykey, (1, int(time))
To run into memory errors again, some google and:
run(Mapper,sumsreducer,combiner=sumsreducer,buffersize=4096)
And apparently, now the job ends successfully, BUT, the numbers do not stack up:
$ dumbo cat input.log -hadoop /usr -hadooplib /usr/share/hadoop | wc -l
13595587
$ dumbo cat ip2isp -hadoop /usr -hadooplib /usr/share/hadoop | awk {sum+=$2}END{print sum}1116666
I was expecting the sum of column #2 from ip2isp (-output path from dumbo start command) to be equal to the number of log lines in input.log
here is the code:
class Mapper:
def __init__(self):
from re import compile
self.regex = compile(r'(?P<ip>[\d]+)\s(?P<response>[\d]+)\s(?P<time>[\d]+)$')
from pygeoip import GeoIP, MEMORY_CACHE
self.geoip = GeoIP("/usr/share/geoip/GeoIPISP.dat",flags=MEMORY_CACHE)
def __call__(self,key,value):
mo = self.regex.match(value)
if mo:
from socket import inet_ntoa
from struct import pack
isp = str(self.geoip.org_by_addr(inet_ntoa(pack("!L",int(mo.group("ip"))))))
code, time = mo.group("response"), mo.group("time")
mykey = '::'.join([isp,code])
yield mykey, (1, int(time))
if __name__ == "__main__":
from dumbo import run, sumsreducer
run(Mapper,sumsreducer,combiner=sumsreducer,buffersize=4096)
Any advice?
Thanks!