Dedupe - large datasets choke trying to output flat file from matchBlocks generator

182 views
Skip to first unread message

mza...@clarityinsights.com

unread,
Apr 28, 2018, 9:14:14 PM4/28/18
to open source deduplication
Love the dedupe Python package- thank you very much for development!  Need some help with large datasets/lots of blocks outputting:

My process is "choking" when I try to use the generator output from StaticDedupe.matchBlocks (clustered_dupes) to write to a flat file csv.write() -- this is the entity_map output stage.

I am implementing this solution on a large dataset (anywhere from 2-8 million records) on a number of address component fields (by using libpostal!).  I am running on a powerful EC2 linux instance (40 vCPU cores, 100 GB RAM, 1 TB storage), and I am successfully running quickly through the Blocking for predicates/indexes, etc. with num_cores = 24.  I am also able to run efficiently through the thresholdBlocks (to create the threshold score) and then initially through matchBlocks. 

However, after matchBlocks has “completed” it chokes (monitoring progress in the candidates_gen function by print() cluster counts every 5k clusters).  I am trying to output the clustered_dupes output to a flat file CSV on the system that I will ingest back with a Hadoop load.  When it chokes, what I see:

·         My 24 Python processes (monitored by top on Unix terminal) drops to a single Python process that shows 100% utilization and over 160g of virtual memory. 

·         I have one process that examined 285k clusters/blocks that, after it completed clustering took about 20 minutes to finally output the flat file

·         However, I have other processes that examined 480k+ clusters/blocks and it will sit for 28 hours without coming back.

 

So, big Q- is there any way to better output the clustered_dupes or better use the generator?  I am about to dive into the API and modify it to see if I can stream output to a flat file, so I don’t run into these constraints.  However, this doesn’t sound like an idea solution.


Another option I'm considering is breaking apart the blocks that go into matchBlocks() but besides potentially missing out on calculating the small block IDs, would I be missing out on anything else?

Code below, screen shots show choked process:


#!/usr/bin/python

# Import, setup, settings and training file into StaticDedupe



## Clustering

def candidates_gen(result_set) :
    lset = set

    print("started clustering at ", time.time() - start_time, "seconds")
    block_id = None
    records = []
    i = 0
    n = 0
    for row in result_set :
        #print(row)
        n += 1
        if n % 50000 == 0 :
            print(n, "rows at ", time.time() - start_time, "seconds", flush=True)
        if row['block_id'] != block_id :
            if records :
                yield records

            block_id = row['block_id']
            records = []
            i += 1

            if i % 1000 == 0 :
                print(i, "clusters/blocks at ", time.time() - start_time, "seconds", flush=True)

        smaller_ids = row['smaller_ids']
        
        if smaller_ids :
            smaller_ids = lset(smaller_ids.split(','))
        else :
            smaller_ids = lset([])
    
        records.append((row['er_id'], row, smaller_ids))

    print("ended clustering at ", time.time() - start_time, "seconds")
    if records :
        yield records

temp_c = ((dict_factory(c,row)) for i, row in enumerate(c))

clustered_dupes = deduper.matchBlocks(candidates_gen(temp_c),threshold=thisThreshold)


#This is where it chokes... Any type of output, even this simplistic example w islice
# I have 24-cores running, showing XXXX clusters/blocks at TIME
# then when it gets to the point where it needs to output

print(clustered_dupes)
print(list(itertools.islice(clustered_dupes, 100)),flush=True)




I have also tried to output using NEXT() on clustered_dupes to no avail...  

        # iterate through until run out of slices!
        print('Writing clusters to file...',flush=True)
        i=0
        while True :
            i += 1
            if i % 5000 == 0 :
                print(i, "clusters/blocks at ", time.time() - start_time, "seconds", flush=True)
            try:            
                myblock = next(rows)
                cluster, scores = myblock
                cluster_id = cluster[0]
                for er_id, score in zip(cluster, scores) :
                    #print("er_id", str(er_id), flush=True)
                    #print("cluster_id", str(cluster_id), flush=True)
                    #print("score", str(score), flush=True)
                    csv_file.write(str(er_id) + '\001' + str(cluster_id)  + '\001' + str(score) + '\n')
            except StopIteration:
               # can no longer slice, so break out of while loop
                break
           
        csv_file.flush()
        print('Finished writing clusters to file...',flush=True)

Running cores

See the output while running, gets stuck on last clusters (when matchBlocks actually kicks off or finishes??)


This is where it chokes... drops from 24 cores to 1 core


Thank you in advance!


Forest Gregg

unread,
Apr 30, 2018, 3:26:00 PM4/30/18
to open-source-...@googlegroups.com
Hi Zajack,

Clustering proceeds in three phases.

1. Comparing every pair of records that are blocked together
2. Treating each pair of records as an edge in a network and finding the connected components.
3. For each connected component, using hierarchical agglomerative clustering to make groups of co-referent records.

Step 1 is already parallelized.

Step 2 is not parallelized. It could be done, but would require a special algorithm. Something like https://people.csail.mit.edu/jshun/6886-s18/papers/MGB13.pdf

Step 3 is very easy to parallelize. But we haven't gotten around to it.

Hope that helps.

Best,

Forest




ClarityInsights.com

--

---
You received this message because you are subscribed to the Google Groups "open source deduplication" group.
To unsubscribe from this group and stop receiving emails from it, send an email to open-source-dedupl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

mza...@clarityinsights.com

unread,
May 1, 2018, 1:33:59 PM5/1/18
to open source deduplication
Forest,
Thank you for the quick response.  I really appreciate the background here and it does help me understand the hang-ups I am experiencing, but it does not get the heart of my problem.  I am looking for input on best or valid practices for the approach I am taking--let me know if this is out of scope for the group.

I am using the Dedupe MySQL example as my base, so that is from where my terminology derives; however, my setup uses CSV output for Hive on Hadoop.  To relate the MySQL example back to your description--bottom line, is all of this occurring in .matchBlocks()??:
  1. Step 1 is using the output the blocking table (that was done in MySQL and is very efficient); the actual comparison occurs in .matchBlocks() method, right?  And that's the one that obeys num_cores and is paralellized.
  2. Step 2 - really cool, hadn't though of this from the context of graph theory [duh! to me], and I have a skill gap on this topic which I need to close.  Step 2 is also happening in the .matchBlocks() call, right?  And this is where the cores drop back to 1 python instance
  3. Step 3 - this allows us to create the scores for each cluster (I have been thinking of it as a centroid, but that may be incorrect), which then has to meet or fail the threshold value.  This also happens inside .matchBlocks() call, correct?

Rounding back to my issue and proposed solutions -  The data is just too damn big to fit into a single .matchBlocks() run (too many features, comparisons, rows, blocks, etc.) without further paralellization to break up memory and CPU requirements.  It is just going to choke.

So... sounds like my latter idea is the most immediately tangible solution (best bet):  I need to break up the amount of data being fed into matchBlocks().  Sound right?  I will try to do this by:
  • Breaking up the the output from the smaller_coverage table into maybe 100k record tables/chunks...  (I can get that much data to work, no prob).  
  • But I have to break them up in such a way that all records that have a block_id contained in a smaller_ids are in the same table/chunk.
  • This will still allow me to use a single threshold and will keep all the comparisons on the same playing field.  Correct?

I feel like this is getting close to resolving... I am going to try to develop this proposed solution over the next couple of days.  Has no one else run into an issue with running through so much data?

Thank you,
Matt Z

Forest Gregg

unread,
May 1, 2018, 2:04:35 PM5/1/18
to open-source-...@googlegroups.com
Your proposed solution won't help, because it will only affect the scoring of individual pairs. Your bottleneck is after that.

If you could find a way of separating records into chunks that you know will not be coreferent, that would be a way of working around your problem (this is, approximately, what the connected component algorithm is doing now and that is where your bottleneck is.



mza...@clarityinsights.com

unread,
May 1, 2018, 5:17:30 PM5/1/18
to open source deduplication
You are right!  I think we are totally on the same page about what needs to happen.  I was hoping to accomplish this by looking at groups/chunks that are not coreferent and thought I could do it as I described.  

Sanity check:  I understand NOT COREFERENT to be the the records that have _no_ common blocks; that is, nothing in common from the blocking_map table and therefore in any downstream tables such as plural_key, plural_block, and finally smaller_coverage.  Is this the right track here?

So, I will grab records (ER_ID, in my case) that have common blocks and keep them in the same dataset for processing.  Those that have no common blocks may be in separate datasets of any size to accommodate their potential partner (referent??) records--for my purposes this is likely around 100k blocks.  I thought I could do that with the smaller_coverage table, but you are saying I'm on the wrong track there.  If I have to back up a bit further I think that is a very do-able process.  Does that sound right?

Thanks again, Forest.  I would love to pick your brain sometime.
By the way... Coreferent!  There's a $5 word.  So precise and descriptive.  Love it.

Forest Gregg

unread,
May 2, 2018, 10:18:16 AM5/2/18
to open-source-...@googlegroups.com
What would you like to happen in this scenario:

Record 1 and record 2 are blocked together
Record 2 and record 3 are blocked together
Record 1 and record 3 are not blocked together.

Even if you want to assume that every pair of records that should match will be blocked together, you'll still want to consider all three records together to decide whether 1 and 2 belong together or 2 and 3.

So you are back to needing to do connected components on the blocked records. Because this a larger data set, it will be even worse than finding connected components of the scored records.



mza...@clarityinsights.com

unread,
May 2, 2018, 5:41:24 PM5/2/18
to open source deduplication
Ok, I think I follow...  Even if the records are not blocked, they may be transitively coreferent (is this a proper term??) and need to be included in the same dataset during matchBlock() operation.  That makes this a lot harder than I had originally anticipated, and that makes me sad. :(  

I went back to your original reply and read the MGB13 paper on the New Parallel algorithm... it's definitely going to take some time for me to wrap my head around it; unfortunately I won't be implementing it anytime soon on my own.  Though, I would like to take a crack at it someday... What's the best way to get involved to take a deeper look at something like this?  Just pull the GitHub and go at it?

Back and evaluating my options...  Looks like I need to pare this down (no pun intended) in one or more ways to get it to work.  Would appreciate the input on my thoughts below:

  1. Run with fewer rows, sample down the data.  However, this means I won't be able to perform the deduplication on all my records, just those in the sample dataset.  This is really a no-go.
  2. Knowing my data very well, maybe I can still break it up into non-coreferent datasets for processing, but this will be very specific to my dataset.  Not sure best way to go about this, though, since the point is to let the suite of probabilistic algorithms in Dedupe tell me.
  3. Reduce the number of Predicates calculated -- Would this work?  What's the best way to do this--would changing index_predicates=False during training reduce the complexity?  I had run early versions of this with much simpler Predicate rules; e.g. I had originally had only 2 rules on 2 fields, but this process is choking now because I broke a single address into 7 component fields (see my matching strategy below, it got complicated once I added the additional fields).
  4. Reduce number of fields in the model - Similar to 3 above, but accomplish less predicates by just removing fields that are likely to have the lowest value for dedupe.  There are a couple fields that are mostly None (PO Box, House, Entrance, and Level are poorly populated), then I just rely on the big ones like house number, road, unit, city, state, zip, and name.
Thank you for all the help, Forest.  This has definitely helped me iterate through potential options a bit faster than trial and error.  Though, I'm at the same point I was on Sunday.

My latest model matching strategy below (output from training), it got complicated once I broke out the component address fields:

Forest Gregg

unread,
May 2, 2018, 11:35:26 PM5/2/18
to open-source-...@googlegroups.com
You have run into a bottleneck in the dedupe -- the current implementation of connected components. In order to overcome that bottleneck that implementation needs to change. If you can do it, great! DataMade can also prioritize work on the library on paid consulting basis.

mza...@clarityinsights.com

unread,
May 21, 2018, 5:20:47 AM5/21/18
to open source deduplication
I have been jumping on and off this work the past few weeks, but I still haven't come to a good solution.

I started slicing the data into separating non-coreferent groups. However, I am still choking at matchBlocks sometimes with what I would consider small data - 50k rows raw data, which makes 50k blocks feeding into matchBlocks. Definitely would expect this to go through. I can leave it running for a day or more, churning at 100 percent CPU on a single core to no avail. ThresholdBlocks runs without issue, if that has any impact. Using the Debug Logger the last message I get is DEBUG:dedupe.api:matching done, begin clustering.

Would complicated predicate rules, such as those in the above posts, be enough to choke the program with such a relatively small amount of data?

How can I better investigate what's happening here? Or, where would I go from here to get my matching back on track?

Thank you!
Matt Z

mza...@clarityinsights.com

unread,
Jul 12, 2018, 4:37:53 PM7/12/18
to open source deduplication
Update on this, quite a bit later.  I worked around it in a couple ways:
  • No way to push through the CLUSTERING.  Like Forest said earlier, it's the technical bottleneck in Dedupe.  This becomes much more pronounced with "complex" matching rules.  It takes relatively few rows to choke it with these complex rules.  And no way to debug it, because it just spins inside the single algorithm... it will also no return even after a week or more with 20+ vcores, 120GB RAM, etc... It will eventually fail (it's not parallel).
  • Simplify the blocking - using blocker(data[, target=True]helps reduce the number of blocks and subsequent pairs.
  • Simplify the training - train([recall=0.95[index_predicates=False]]will create simple predicates and reduce the amount of downstream processing.
  • Break up the data into non-coreferent sets - I did this by separating out the blocking on 3-digit ZIP, since the ZIP field is pretty accurate across my data
Even with simple predicates I was able to get really good matching by training on "A LOT" of data.  Can't stress that enough.  I was originally doing the Active Labeling and had done as many as 200 pairs.  It worked OK on the training data, but not so great on a larger dataset.  I ended up using matchBlocks with data I paired in Excel using previously matched data- had a set of 15k records that were grouped together appropriately.  This took a long time to train but provided amazingly accurate results across the entire dataset of 8 million records.

Cheers,
Matt Z

rachel...@gettectonic.com

unread,
Aug 15, 2018, 2:50:16 PM8/15/18
to open source deduplication
@matt, can you share what you did to separate out the 3-digit zip?  We are facing a similar problem and I'm not quite sure how to break it up.

Thanks!
Rachel
 

Reply all
Reply to author
Forward
0 new messages