Hi!
I am trying to use modin to generate predictions using a machine learning model. Generally, I would like to do something like the following:
model = MyComplexModel()
def predict(row):
return model.predict(row['SERIES1'], row['SERIES2'])
data_df['RESULT'] = data_df.apply(predict, axis=1)
model is a complex object comprising several encoders and a tensorflow model. In total, it consumes about 3Gb of memory. When running this code I get the stacktrace below. Is there anything special I need to consider when doing something like this?
Thanks!
Stacktrace:
[2022-11-23, 21:58:43 UTC] {logging_mixin.py:117} WARNING - 2022-11-23, 21:58:43 UTC - distributed.core - ERROR - Exception while handling op scatter
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm
result = await result
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/scheduler.py", line 5454, in scatter
keys, who_has, nbytes = await scatter_to_workers(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils_comm.py", line 142, in scatter_to_workers
out = await All(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 237, in All
result = await tasks.next()
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 946, in send_recv
raise Exception(response["exception_text"])
Exception: RecursionError('maximum recursion depth exceeded')
[2022-11-23, 21:58:43 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/market_salary.py", line 38, in process_data
task.execute(data_dir, prediction_dir)
File "/home/airflow/.local/lib/python3.9/site-packages/models/market_salary/task.py", line 105, in execute
data_df['RESULT'] = data_df.apply(predict, axis=1)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/_compat/pandas_api/latest/dataframe.py", line 69, in apply
return self._apply(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/pandas/dataframe.py", line 373, in _apply
query_compiler = super(DataFrame, self)._apply(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/pandas/base.py", line 848, in _apply
query_compiler = self._query_compiler.apply(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 2430, in apply
return self._callable_func(func, axis, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 2551, in _callable_func
new_modin_frame = self._modin_frame.apply_full_axis(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 125, in run_f_on_minimally_updated_metadata
result = f(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2097, in apply_full_axis
return self.broadcast_apply_full_axis(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 125, in run_f_on_minimally_updated_metadata
result = f(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2538, in broadcast_apply_full_axis
new_partitions = self._partition_mgr_cls.broadcast_axis_partitions(
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 58, in wait
result = func(cls, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 443, in broadcast_axis_partitions
preprocessed_map_func = cls.preprocess_func(apply_func)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log
return obj(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 120, in preprocess_func
return cls._partition_class.preprocess_func(map_func)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py", line 257, in preprocess_func
return DaskWrapper.put(func, hash=False, broadcast=True)
File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/execution/dask/common/engine_wrapper.py", line 98, in put
return client.scatter(data, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2441, in scatter
return self.sync(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
return sync(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync
raise exc.with_traceback(tb)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 379, in f
result = yield future
File "/home/airflow/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2321, in _scatter
await self.scheduler.scatter(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 944, in send_recv
raise exc.with_traceback(tb)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm
result = await result
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/scheduler.py", line 5454, in scatter
keys, who_has, nbytes = await scatter_to_workers(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils_comm.py", line 142, in scatter_to_workers
out = await All(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 237, in All
result = await tasks.next()
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 946, in send_recv
raise Exception(response["exception_text"])
Exception: RecursionError('maximum recursion depth exceeded')