Hi modin team,
Thanks for the great project! I'm new to modin and now i'm benchmarking modin & pandas for our project. Below is the problem I encountered with modin. Could you pls help give me some lights Thanks a lot!
modin.__version__
): 0.12.1import os
os.environ["MODIN_ENGINE"] = "ray"
import ray
ray.init(address="auto")
import modin.pandas as mpd
from collections import Counter
import pandas as pd
import time
@ray.remote
def modin_csv_parquet_perf(csv_file_prefix: str):
# modin read csv
start_time = time.time()
modin_df = mpd.read_csv(csv_file_prefix + '.csv')
print('===>Time cost(s) - modin read_csv: ' + str(time.time() - start_time))
print(f'Modin csv DF len = {len(modin_df)}')
# # modin write csv
# start_time = time.time()
# modin_df.to_csv(csv_file_prefix + '2.csv', index=False)
# print('===>Time cost(s) - modin to_csv: ' + str(time.time() - start_time))
#
# start_time = time.time()
# modin_df = mpd.read_csv(csv_file_prefix + '2.csv')
# print('===>Time cost(s) - modin read_csv: ', time.time() - start_time)
# modin write parquet
start_time = time.time()
modin_df.to_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin to_parquet: ' + str(time.time() - start_time))
# modin read parquet
start_time = time.time()
modin_df = mpd.read_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin read_parquet: ' + str(time.time() - start_time))
print(f'Modin parquet DF len = {len(modin_df)}')
if __name__ == '__main__':
print('''This cluster consists of
{} nodes in total
{} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))
CSV_FILE = '/home/term/wanlu/500w50f'
object_ids = [modin_csv_parquet_perf.remote(CSV_FILE) for _ in range(1)]
ip_addresses = ray.get(object_ids)
print('Tasks executed:')
for ip_address, num_tasks in Counter(ip_addresses).items():
print(' {} tasks on {}'.format(num_tasks, ip_address))
modin_df.to_parquet will write the dataframe(5 million records) to parquet files automatically partitioned on 3 ray nodes. But when mpd.read_parquet, I can only read the parquet records(2499994) of the head ray node that the scripts were run on. How can I get all the 5million records on 3 nodes? I don't see any document about these details. Many thanks.
[term@dev-ctb-xs-196-65 wanlu]$ python3 testmodinpandas.py
This cluster consists of
3 nodes in total
24.0 CPU resources in total
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_csv: 12.558054447174072
(modin_csv_parquet_perf pid=2961) Modin csv DF len = 5000000
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin to_parquet: 8.699442386627197
Tasks executed:
1 tasks on None
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_parquet: 6.788694381713867
(modin_csv_parquet_perf pid=2961) Modin parquet DF len = 2499994