How to read all parquet partition files that wrote by modin df.to_parquet

305 views
Skip to first unread message

Leon

unread,
May 22, 2022, 12:03:54 AM5/22/22
to d...@modin.org, devin.p...@intel.com

Hi modin team,   

Thanks for the great project!   I'm new to modin and now i'm benchmarking  modin & pandas for our project.  Below is the problem I encountered with modin.  Could you pls help give me some lights  Thanks a lot!  

 

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): CentOS 7.6.1810
  • Modin version (modin.__version__): 0.12.1
  • Python version: 3.7.6
  • Ray version: 1.12.0 ( Ray cluster: 1 head node + 2 worker nodes)
  • Code we can use to reproduce:
import os
os.environ["MODIN_ENGINE"] = "ray"
import ray
ray.init(address="auto")
import modin.pandas as mpd
from collections import Counter
import pandas as pd
import time

@ray.remote
def modin_csv_parquet_perf(csv_file_prefix: str):
# modin read csv
start_time = time.time()
modin_df = mpd.read_csv(csv_file_prefix + '.csv')
print('===>Time cost(s) - modin read_csv: ' + str(time.time() - start_time))
print(f'Modin csv DF len = {len(modin_df)}')

# # modin write csv
# start_time = time.time()
# modin_df.to_csv(csv_file_prefix + '2.csv', index=False)
# print('===>Time cost(s) - modin to_csv: ' + str(time.time() - start_time))
#
# start_time = time.time()
# modin_df = mpd.read_csv(csv_file_prefix + '2.csv')
# print('===>Time cost(s) - modin read_csv: ', time.time() - start_time)


# modin write parquet
start_time = time.time()
modin_df.to_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin to_parquet: ' + str(time.time() - start_time))

# modin read parquet
start_time = time.time()
modin_df = mpd.read_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin read_parquet: ' + str(time.time() - start_time))
print(f'Modin parquet DF len = {len(modin_df)}')
if __name__ == '__main__':
print('''This cluster consists of
{} nodes in total
{} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))


CSV_FILE = '/home/term/wanlu/500w50f'

object_ids = [modin_csv_parquet_perf.remote(CSV_FILE) for _ in range(1)]
ip_addresses = ray.get(object_ids)
print('Tasks executed:')
for ip_address, num_tasks in Counter(ip_addresses).items():
print(' {} tasks on {}'.format(num_tasks, ip_address))



Describe the problem

modin_df.to_parquet will write the dataframe(5 million records) to parquet files automatically partitioned on 3 ray nodes. But when mpd.read_parquet, I can only read the parquet records(2499994) of the head ray node that the scripts were run on. How can I get all the 5million records on 3 nodes? I don't see any document about these details. Many thanks.

Source code / logs

[term@dev-ctb-xs-196-65 wanlu]$ python3 testmodinpandas.py
This cluster consists of
3 nodes in total
24.0 CPU resources in total

(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_csv: 12.558054447174072
(modin_csv_parquet_perf pid=2961) Modin csv DF len = 5000000
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin to_parquet: 8.699442386627197
Tasks executed:
1 tasks on None
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_parquet: 6.788694381713867
(modin_csv_parquet_perf pid=2961) Modin parquet DF len = 2499994

Mahesh Vashishtha

unread,
Nov 29, 2022, 2:40:19 PM11/29/22
to modin-dev
Hi Leon, I think you might have reported this issue here: https://github.com/modin-project/modin/issues/4479

Writing to a local path when you're using a ray or dask cluster with multiple nodes is not recommended, and this draft PR should warn users not to try that.
Reply all
Reply to author
Forward
0 new messages