join reducer not being reused for different keys?

64 views
Skip to first unread message

Tim Sell

unread,
Jun 6, 2009, 7:30:00 PM6/6/09
to dumbo...@googlegroups.com
I have this dumbo program

#############
from dumbo import *

def mapper(key, value):
   flds = value.split("\t")
   k = flds[0]
   v = flds[1:]
   yield k,v

class reducer(JoinReducer):
   def __init__(self):
       self.accType = None
   def primary(self, key, values):
       for value in values:
           self.accType = key+value[0]
   def secondary(self, key, values):
       accType = self.accType
       self.accType = None
       for value in values:
           yield key, value + [accType]


def runner(job):
   multimapper = MultiMapper()
   multimapper.add("userlogs", secondary(mapper))
   multimapper.add("userAccType", primary(mapper))
   job.additer(multimapper, reducer)

def starter(prog):
   pass

if __name__ == "__main__":
   from dumbo import main
   main(runner, starter)

#############

I'm using these input files...
userlogs:
userid1 normalstuff
userid2 morenormalstuff
userid3 specialstuff

userAccType:
id1 NORMALUSER
id2 NORMALUSER
id3 SPECIALUSER

Now with the above program, this should create a file where "None" is
joined to each userlog entry as obviously the keys don't match.
But... I get this output:
'userid1'       ['normalstuff', 'SPECIALUSER']
'userid2'       ['morenormalstuff', None]
'userid3'       ['specialstuff', None]

wot?!!
It looks like it's reusing the reducer with different keys.
Interestingly, if I changed the keys in userAccType to be wrongid1/2/3
(instead of just id1/2/3), then all the joins were None. Which seems
very weird.

Is this a bug?

and of course, if I use ids that do match, it does works as expected.
But this has me worried that some keys that don't have any joined
data, might get some anyway.

A work around I suppose would be to store the last used key in the
reducer as well as accType. but that seems very inelegant?

I think every primary key that is a substring of the secondary key is
being sent to the same JoinReducer.

~Tim.

Klaas Bosteels

unread,
Jun 7, 2009, 4:22:30 AM6/7/09
to dumbo...@googlegroups.com
On Sun, Jun 7, 2009 at 1:30 AM, Tim Sell<trs...@gmail.com> wrote:
> It looks like it's reusing the reducer with different keys.
> Interestingly, if I changed the keys in userAccType to be wrongid1/2/3
> (instead of just id1/2/3), then all the joins were None. Which seems
> very weird.
>
> Is this a bug?

I'm afraid it's not. The only guarantees you really have are that:

1. the primary and secondary keys with the same value go to the same
reducer, and
2. "reducer.primary" gets called before "reducer.secondary" for each key.

So you can't run into problems when the considered set of primary keys
is a superset of the secondary ones, but when that's not the case you
might indeed get weird results.

> and of course, if I use ids that do match, it does works as expected.
> But this has me worried that some keys that don't have any joined
> data, might get some anyway.

You don't have to worry if you make sure that there is a primary key
for each secondary one, which is a reasonable assumption to make I
think. In most practical cases it should be possible to structure the
program in such a way that the primary keys are indeed a superset of
the secondary ones. In case of your example, for instance, I'd expect
"userAccType" to contain the type for each existing user, and if
that's not the case you could always join it with a complete list of
all user ids first (using the keys from the complete lists as primary
keys) and then join "userlogs" with the output (using the keys from
the output as primary keys).

> A work around I suppose would be to store the last used key in the
> reducer as well as accType. but that seems very inelegant?

Yeah, that's actually the only way I can think of for getting valid
results when you can't be sure that the primary keys are a superset of
the secondary ones.

> I think every primary key that is a substring of the secondary key is
> being sent to the same JoinReducer.

Could be. A single JoinReducer processes all the keys and values from
the same reduce task, and Hadoop decides which key goes to which
reduce task by hashing it (see
org.apache.hadoop.mapred.lib.HashPartitioner for the details).


I'm glad you brought this up Tim, because I hadn't realized yet you
have to be this careful when joining to be honest. The examples I used
in the posts on dumbotics.com are probably also kind of misleading.
I'll try to rewrite those a bit when necessary.

-Klaas

Klaas Bosteels

unread,
Jun 8, 2009, 3:56:04 AM6/8/09
to dumbo...@googlegroups.com
Actually, I think the requirements can be relaxed a bit. A reducer like, e.g.,

class Reducer(JoinReducer):
def primary(self, key, values):
self.hostname = values.next()
def secondary(self, key, values):
key = self.hostname


for value in values:
yield key, value

indeed only works when the primary keys are a superset of the
secondary ones, but when you adapt it to

class Reducer(JoinReducer):
def __init__(self):
self.hostname = "unknown"
def primary(self, key, values):
self.hostname = values.next()
def secondary(self, key, values):
key = self.hostname
self.hostname = "unknown"


for value in values:
yield key, value

it also works when the primary keys are a subset of the secondary
ones, since you can then be sure that "self.hostname" will always be
reset to "unknown" right after if got set by the "primary" method.

So in general I think that joining can be made to work when the
primary keys are either a superset or a subset of the secondary ones.

-Klaas

Samarth Gahire

unread,
Oct 10, 2012, 3:26:05 AM10/10/12
to dumbo...@googlegroups.com
Does it mean we can not handle the keys which are not common in both the input sets?
In one of my usecase I have two sets of keys , they have some keys in common but also there are some keys which are unique in both sets. Is there any way that can be handled with Join reducer or by any other approach?

Tobias Speckbacher

unread,
Oct 10, 2012, 3:59:17 AM10/10/12
to dumbo...@googlegroups.com
You can do an outer join on the primary by using JoinCombiner instead of JoinReducer … that's about it.

This will allow you to produce output for all keys present in the primary data set, even if there is no corresponding records in the secondary dataset.

A full outer join is not possible.


On Oct 10, 2012, at 12:26 AM, Samarth Gahire <samarth...@gmail.com> wrote:

Does it mean we can not handle the keys which are not common in both the input sets?
In one of my usecase I have two sets of keys , they have some keys in common but also there are some keys which are unique in both sets. Is there any way that can be handled with Join reducer or by any other approach?

--
You received this message because you are subscribed to the Google Groups "dumbo-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/dumbo-user/-/djTV5hTE3jYJ.
To post to this group, send email to dumbo...@googlegroups.com.
To unsubscribe from this group, send email to dumbo-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/dumbo-user?hl=en.

Samarth Gahire

unread,
Oct 10, 2012, 8:00:51 AM10/10/12
to dumbo...@googlegroups.com
Thank you so much Tobias for the quick reply, could you please elaborate more on this using above hostip example so that it will be easy to understand the flow of the hadoop job. How exactly reducer and combiner will look like in this case? And how to handle primary keys which are not present in secondary .

Tobias Speckbacher

unread,
Oct 10, 2012, 2:40:07 PM10/10/12
to dumbo...@googlegroups.com
It is actually outer on the secondary set.  The key here is that execution of the secondary reducer is not blocked if there is no key in the primary set.
I had a better code sample somewhere ... but can't find it right now ... this should illustrate the point.  Careful with managing the data, as with the JoinReducer the instance of the class is not isolated to a single key ... so make sure state you saved for one key does not pollute the execution of the next key.

import dumbo
from dumbo.decor import primary, secondary
from dumbo.lib import MultiMapper, JoinCombiner


def id_value_mapper(key,value):
    id,name = value.split(',')
    yield id, name



class IdJoinReducer(JoinCombiner):
    depts = []
    pk = None
    def primary(self, key, values):
        if self.pk != key:
            self.depts = []
        for value in values:
            self.depts.append(value)
        self.pk = key

    def secondary(self, key, values):
        if self.pk != key:
            self.depts = []
            for value in values:
                yield key,[key,None,value]
        for value in values:
            for dept in self.depts:
                yield key,[key,dept,value]



if __name__ == '__main__':
    job = dumbo.Job()
    mm = MultiMapper()
    mm.add('department',primary(id_value_mapper))
    mm.add('users',secondary(id_value_mapper))
    job.additer(mm,IdJoinReducer)
    job.run()

users:

1,tobias
2,alexander
3,albert
4,roger

department:

1,engineering
2,accounting
4,operations
2,operations

output:

'1' ['1', 'engineering', 'tobias']
'2' ['2', 'accounting', 'alexander']
'2' ['2', 'operation', 'alexander']
'3' ['3', None, 'albert']
'4' ['4', 'operations', 'roger']





On Wed, Oct 10, 2012 at 5:00 AM, Samarth Gahire <samarth...@gmail.com> wrote:
Thank you so much Tobias for the quick reply, could you please elaborate more on this using above hostip example so that it will be easy to understand the flow of the hadoop job. How exactly reducer and combiner will look like in this case? And how to handle primary keys which are not present in secondary .

--
You received this message because you are subscribed to the Google Groups "dumbo-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/dumbo-user/-/YrebYqEWfVYJ.

Gilles Vandelle

unread,
Oct 11, 2012, 11:17:27 AM10/11/12
to dumbo...@googlegroups.com
You can emulate the process by adding yourself a prefix to the data

def mapper_set1(key,value):
     yield key,("set1",value)

def mapper_set2(key,value):
     yield key,("set2",value)

def reducer(key, values):
   for setkey,value in values:
      if setkey=="set1:

      elif setkey=="set2:


On Wed, Oct 10, 2012 at 9:26 AM, Samarth Gahire <samarth...@gmail.com> wrote:
Does it mean we can not handle the keys which are not common in both the input sets?
In one of my usecase I have two sets of keys , they have some keys in common but also there are some keys which are unique in both sets. Is there any way that can be handled with Join reducer or by any other approach?

--
You received this message because you are subscribed to the Google Groups "dumbo-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/dumbo-user/-/djTV5hTE3jYJ.

Tobias Speckbacher

unread,
Oct 11, 2012, 11:36:35 AM10/11/12
to dumbo...@googlegroups.com
Right, a record type will do :).

When i needed this my data was a bit different.  In the secondary I had the equivalent of a web log and in secondary I had ip geolocation data organized in network blocks.  For both keys the key was converted to the /24 network address they belong to.  Meaning that the primary reducer would be fed with n rows containing network ranges, and the secondary was fed with millions of rows of log data per key.  

Using a record type in this case is not practical as it does not guarantee that records in the primary set will arrive in the reducer prior to the secondary, thus, the data to complete the join may not be completely available yet.  Any with the sum of the data per key being > memory allocation it would not have worked.

Anyway ... you are correct the sample I provided is too complicated for an outer join with unique keys in the primary set.

-Tobias
Reply all
Reply to author
Forward
0 new messages