RecursionError during dataframe apply

8 views
Skip to first unread message

Chris Dedels

unread,
Nov 28, 2022, 8:13:23 AM11/28/22
to modin-dev
Hi!
I am trying to use modin to generate predictions using a machine learning model.  Generally, I would like to do something like the following:

model = MyComplexModel()

def predict(row):
    return model.predict(row['SERIES1'], row['SERIES2'])

data_df['RESULT'] = data_df.apply(predict, axis=1)

model is a complex object comprising several encoders and a tensorflow model.  In total, it consumes about 3Gb of memory.  When running this code I get the stacktrace below.  Is there anything special I need to consider when doing something like this?

Thanks!

Stacktrace:
[2022-11-23, 21:58:43 UTC] {logging_mixin.py:117} WARNING - 2022-11-23, 21:58:43 UTC - distributed.core - ERROR - Exception while handling op scatter Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm result = await result File "/home/airflow/.local/lib/python3.9/site-packages/distributed/scheduler.py", line 5454, in scatter keys, who_has, nbytes = await scatter_to_workers( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils_comm.py", line 142, in scatter_to_workers out = await All( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 237, in All result = await tasks.next() File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 946, in send_recv raise Exception(response["exception_text"]) Exception: RecursionError('maximum recursion depth exceeded') [2022-11-23, 21:58:43 UTC] {taskinstance.py:1851} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 188, in execute return_value = super().execute(context) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 193, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/opt/airflow/dags/market_salary.py", line 38, in process_data task.execute(data_dir, prediction_dir) File "/home/airflow/.local/lib/python3.9/site-packages/models/market_salary/task.py", line 105, in execute data_df['RESULT'] = data_df.apply(predict, axis=1) File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/_compat/pandas_api/latest/dataframe.py", line 69, in apply return self._apply( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/pandas/dataframe.py", line 373, in _apply query_compiler = super(DataFrame, self)._apply( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/pandas/base.py", line 848, in _apply query_compiler = self._query_compiler.apply( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 2430, in apply return self._callable_func(func, axis, *args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 2551, in _callable_func new_modin_frame = self._modin_frame.apply_full_axis( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 125, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2097, in apply_full_axis return self.broadcast_apply_full_axis( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 125, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2538, in broadcast_apply_full_axis new_partitions = self._partition_mgr_cls.broadcast_axis_partitions( File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 58, in wait result = func(cls, *args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 443, in broadcast_axis_partitions preprocessed_map_func = cls.preprocess_func(apply_func) File "/home/airflow/.local/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 120, in preprocess_func return cls._partition_class.preprocess_func(map_func) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py", line 257, in preprocess_func return DaskWrapper.put(func, hash=False, broadcast=True) File "/home/airflow/.local/lib/python3.9/site-packages/modin/core/execution/dask/common/engine_wrapper.py", line 98, in put return client.scatter(data, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2441, in scatter return self.sync( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync return sync( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync raise exc.with_traceback(tb) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 379, in f result = yield future File "/home/airflow/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2321, in _scatter await self.scheduler.scatter( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 944, in send_recv raise exc.with_traceback(tb) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm result = await result File "/home/airflow/.local/lib/python3.9/site-packages/distributed/scheduler.py", line 5454, in scatter keys, who_has, nbytes = await scatter_to_workers( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils_comm.py", line 142, in scatter_to_workers out = await All( File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 237, in All result = await tasks.next() File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1154, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 946, in send_recv raise Exception(response["exception_text"]) Exception: RecursionError('maximum recursion depth exceeded')

Chris Dedels

unread,
Nov 28, 2022, 8:29:42 AM11/28/22
to modin-dev
Another attempt at pasting the stacktrace so it can be read more easily:
Reply all
Reply to author
Forward
0 new messages