Using multiprocessing library stuck infinitely on dataproc cluster

347 views
Skip to first unread message

R1

unread,
Aug 8, 2022, 9:45:15 AM8/8/22
to Google Cloud Dataproc Discussions
So I have the following code for moving a blob from one folder to another

def move_blob(input):
try:
client=storage.Client()
bucket=client.get_bucket(input[0])
blob=bucket.blob(input[1])
out=bucket.copy_blob(blob,bucket,input[2])
blob.delete()
except Exception as e:
return None, str(e)

Here input[0] contains bucket name , input[1] contains source file path and input[2] contains destination file path. I am parallelly running using multiprocessing libray using 

with multiprocessing.Pool(processes=60) as pool:
outputs = pool.map(move_blob, inputs)

Here inputs is list of 1000 of  tuple elements of which conatins (bucketname , src_path, destination_path). Here the problem is that this multiprocessing loop runs indefinately.

Any suggestions ?

Alexander Goida

unread,
Aug 9, 2022, 9:40:05 AM8/9/22
to Google Cloud Dataproc Discussions
Hi,

I've achieved something similar, but without specifying number of processes. I'm using concurrent.futures. The code below moves objects from one bucket to another.

def mv(src_b, dst_b, src_bl, dst_bl):
  from_bucket = client.get_bucket(src_b)
  to_bucket = client.get_bucket(dst_b)
  moving_blob = from_bucket.get_blob(src_bl)
  from_bucket.copy_blob(moving_blob, to_bucket, dst_bl)
  moving_blob.delete()

with ft.ThreadPoolExecutor() as executor:
  futures = []
  for data_item in data_items:
    futures.append(executor.submit(
    mv,
    src_b=data_item["src_bucket_name"],
    dst_b=data_item["dst_bucket_name"],
    src_bl=data_item["src_blob_name"],
    dst_bl=data_item["dst_blob_name"]))
  ft.wait(futures)

Let me know if it helped?

R1

unread,
Aug 9, 2022, 11:45:17 AM8/9/22
to Google Cloud Dataproc Discussions
Hi Alex ,

Thank you very much bro. This thing worked 👍. Using threading technique is much better than multiprocessing. 
Reply all
Reply to author
Forward
0 new messages