However, after matchBlocks has “completed” it chokes (monitoring progress in the candidates_gen function by print() cluster counts every 5k clusters). I am trying to output the clustered_dupes output to a flat file CSV on the system that I will ingest back with a Hadoop load. When it chokes, what I see:
· My 24 Python processes (monitored by top on Unix terminal) drops to a single Python process that shows 100% utilization and over 160g of virtual memory.
· I have one process that examined 285k clusters/blocks that, after it completed clustering took about 20 minutes to finally output the flat file
· However, I have other processes that examined 480k+ clusters/blocks and it will sit for 28 hours without coming back.
So, big Q- is there any way to better output the clustered_dupes or better use the generator? I am about to dive into the API and modify it to see if I can stream output to a flat file, so I don’t run into these constraints. However, this doesn’t sound like an idea solution.
#!/usr/bin/python
# Import, setup, settings and training file into StaticDedupe
## Clustering
def candidates_gen(result_set) : lset = set
print("started clustering at ", time.time() - start_time, "seconds") block_id = None records = [] i = 0 n = 0 for row in result_set : #print(row) n += 1 if n % 50000 == 0 : print(n, "rows at ", time.time() - start_time, "seconds", flush=True) if row['block_id'] != block_id : if records : yield records
block_id = row['block_id'] records = [] i += 1
if i % 1000 == 0 : print(i, "clusters/blocks at ", time.time() - start_time, "seconds", flush=True)
smaller_ids = row['smaller_ids'] if smaller_ids : smaller_ids = lset(smaller_ids.split(',')) else : smaller_ids = lset([]) records.append((row['er_id'], row, smaller_ids))
print("ended clustering at ", time.time() - start_time, "seconds") if records : yield records
temp_c = ((dict_factory(c,row)) for i, row in enumerate(c))
clustered_dupes = deduper.matchBlocks(candidates_gen(temp_c),threshold=thisThreshold)
#This is where it chokes... Any type of output, even this simplistic example w islice
# I have 24-cores running, showing XXXX clusters/blocks at TIME
# then when it gets to the point where it needs to output
print(clustered_dupes)print(list(itertools.islice(clustered_dupes, 100)),flush=True)
# iterate through until run out of slices! print('Writing clusters to file...',flush=True) i=0 while True : i += 1 if i % 5000 == 0 : print(i, "clusters/blocks at ", time.time() - start_time, "seconds", flush=True) try: myblock = next(rows) cluster, scores = myblock cluster_id = cluster[0] for er_id, score in zip(cluster, scores) : #print("er_id", str(er_id), flush=True) #print("cluster_id", str(cluster_id), flush=True) #print("score", str(score), flush=True) csv_file.write(str(er_id) + '\001' + str(cluster_id) + '\001' + str(score) + '\n') except StopIteration: # can no longer slice, so break out of while loop break csv_file.flush() print('Finished writing clusters to file...',flush=True)
ClarityInsights.com
--
---
You received this message because you are subscribed to the Google Groups "open source deduplication" group.
To unsubscribe from this group and stop receiving emails from it, send an email to open-source-dedupl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
I started slicing the data into separating non-coreferent groups. However, I am still choking at matchBlocks sometimes with what I would consider small data - 50k rows raw data, which makes 50k blocks feeding into matchBlocks. Definitely would expect this to go through. I can leave it running for a day or more, churning at 100 percent CPU on a single core to no avail. ThresholdBlocks runs without issue, if that has any impact. Using the Debug Logger the last message I get is DEBUG:dedupe.api:matching done, begin clustering.
Would complicated predicate rules, such as those in the above posts, be enough to choke the program with such a relatively small amount of data?
How can I better investigate what's happening here? Or, where would I go from here to get my matching back on track?
Thank you!
Matt Z
blocker
(data[, target=True]) helps reduce the number of blocks and subsequent pairs.train
([recall=0.95[, index_predicates=False]]) will create simple predicates and reduce the amount of downstream processing.@matt, can you share what you did to separate out the 3-digit zip? We are facing a similar problem and I'm not quite sure how to break it up.