Hi,
I love it so far - it required very little syntax change and provided approximately a 50X speed-up over my original pure python code (and I didn't have to dive into Cython)! The last major improvement I'd love to be able to implement is something called HOGWILD SGD:
which essentially is a very simple, non-thread-safe, parallelized variant of SGD.
Parallel processes all have access to the model weights in shared memory, process randomly sampled training examples, and apply weight updates without locking/sync.
Race conditions/weird results are possible, but if each training example is quite sparse and only updates a small fraction of the model weights
(as is the case with factorization machine models where there are typically tens of thousands of weights with only a handful being updated by each training sample)
the authors report very little numerical error and near-linear parallel performance scaling.Â
Basically all I want to do is apply the Numba PRANGE() function to the main training sample loop:Â
I've tried this with all the available threading layers (tbb, omp, workqueue) and while I understand all the NumbaParallelSafetyWarning that get generated, the code breaks because of some assertion errors (which I'll paste below).
Being pretty unfamiliar with both Numba and low-level parallel processing, I thought it would be a good idea to ask:
1. Is non-thread-safe parallel processing even possible with Numba? Is there any way to do this with some sort of "informed consent" of the program author via options/overrides?
2. If #1 is possible, how would I go about implementing this - what am I missing that's causing this error?
I'd greatly appreciate any help/advice someone far more experienced than myself could offer :). Also if this isn't the right place for questions like this, sorry, and please let me know where I should post!
All the best,
Eric
---
================================================================================
======== Parallel Accelerator Optimizing: Internal parallel functions =========
================================================================================
No source available
Performing sequential lowering of loops...
--------------------------------------------------------------------------------
----------------------------- Before Optimisation ------------------------------
--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel structure is already optimal.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found
Instruction hoisting:
No instruction hoisting found
--------------------------------------------------------------------------------
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/errors.py in new_error_context(fmt_, *args, **kwargs)
800 try:
--> 801 yield
802 except NumbaError as e:
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_block(self, block)
266 loc=self.loc, errcls_=defaulterrcls):
--> 267 self.lower_inst(inst)
268 self.post_block(block)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_inst(self, inst)
479 if isinstance(inst, _class):
--> 480 func(self, inst)
481 return
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in _lower_parfor_parallel(lowerer, parfor)
239 lowerer, parfor, typemap, typingctx, targetctx, flags, {},
--> 240 bool(alias_map), index_var_typ, parfor.races)
241 finally:
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in _create_gufunc_for_parfor_body(lowerer, parfor, typemap, typingctx, targetctx, flags, locals, has_aliases, index_var_typ, races)
933 warnings.warn(NumbaParallelSafetyWarning(msg, loc))
--> 934 replace_var_with_array(races, loop_body, typemap, lowerer.fndesc.calltypes)
935
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array(vars, loop_body, typemap, calltypes)
1363 def replace_var_with_array(vars, loop_body, typemap, calltypes):
-> 1364 replace_var_with_array_internal(vars, loop_body, typemap, calltypes)
1365 for v in vars:
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array_internal(vars, loop_body, typemap, calltypes)
1360 for label, block in loop_body.items():
-> 1361 block.body = replace_var_with_array_in_block(vars, block, typemap, calltypes)
1362
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array_in_block(vars, block, typemap, calltypes)
1346
-> 1347 setitem_node = ir.SetItem(inst.target, const_var, inst.value, inst.loc)
1348 calltypes[setitem_node] = signature(
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/ir.py in __init__(self, target, index, value, loc)
590 assert isinstance(index, Var)
--> 591 assert isinstance(value, Var)
592 assert isinstance(loc, Loc)
AssertionError:
During handling of the above exception, another exception occurred:
LoweringError Traceback (most recent call last)
<ipython-input-56-bf2ee917ede8> in <module>
----> 1 model.fit(interactions_train, sample_weight=sample_weight_train, epochs=5, verbose=True)
~/Repos/rankfm/rankfm/rankfm.py in fit(self, interactions, user_features, item_features, sample_weight, epochs, verbose)
261
262 self._reset_state()
--> 263 self.fit_partial(interactions, user_features, item_features, sample_weight, epochs, verbose)
264
265
~/Repos/rankfm/rankfm/rankfm.py in fit_partial(self, interactions, user_features, item_features, sample_weight, epochs, verbose)
302 self.v_i,
303 self.v_uf,
--> 304 self.v_if
305 )
306 self.w_i, self.w_if, self.v_u, self.v_i, self.v_uf, self.v_if = updated_weights
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_for_args(self, *args, **kws)
418 e.patch_message('\n'.join((str(e).rstrip(), help_msg)))
419 # ignore the FULL_TRACEBACKS config, this needs reporting!
--> 420 raise e
421
422 def inspect_llvm(self, signature=None):
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_for_args(self, *args, **kws)
351 argtypes.append(self.typeof_pyval(a))
352 try:
--> 353 return self.compile(tuple(argtypes))
354 except errors.ForceLiteralArg as e:
355 # Received request for compiler re-entry with the list of arguments
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_lock.py in _acquire_compile_lock(*args, **kwargs)
30 def _acquire_compile_lock(*args, **kwargs):
31 with self:
---> 32 return func(*args, **kwargs)
33 return _acquire_compile_lock
34
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in compile(self, sig)
792 self._cache_misses[sig] += 1
793 try:
--> 794 cres = self._compiler.compile(args, return_type)
795 except errors.ForceLiteralArg as e:
796 def folded(args, kws):
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in compile(self, args, return_type)
75
76 def compile(self, args, return_type):
---> 77 status, retval = self._compile_cached(args, return_type)
78 if status:
79 return retval
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_cached(self, args, return_type)
89
90 try:
---> 91 retval = self._compile_core(args, return_type)
92 except errors.TypingError as e:
93 self._failed_cache[key] = e
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_core(self, args, return_type)
107 args=args, return_type=return_type,
108 flags=flags, locals=self.locals,
--> 109 pipeline_class=self.pipeline_class)
110 # Check typing error if object mode is used
111 if cres.typing_error is not None and not flags.enable_pyobject:
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
566 pipeline = pipeline_class(typingctx, targetctx, library,
567 args, return_type, flags, locals)
--> 568 return pipeline.compile_extra(func)
569
570
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in compile_extra(self, func)
337 self.state.lifted = ()
338 self.state.lifted_from = None
--> 339 return self._compile_bytecode()
340
341 def compile_ir(self, func_ir, lifted=(), lifted_from=None):
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_bytecode(self)
399 """
400 assert self.state.func_ir is None
--> 401 return self._compile_core()
402
403 def _compile_ir(self):
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_core(self)
379 self.state.status.fail_reason = e
380 if is_final_pipeline:
--> 381 raise e
382 else:
383 raise CompilerError("All available pipelines exhausted")
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_core(self)
370 res = None
371 try:
--> 372 pm.run(self.state)
373 if self.state.cr is not None:
374 break
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in run(self, state)
339 (self.pipeline_name, pass_desc)
340 patched_exception = self._patch_error(msg, e)
--> 341 raise patched_exception
342
343 def dependency_analysis(self):
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in run(self, state)
330 pass_inst = _pass_registry.get(pss).pass_inst
331 if isinstance(pass_inst, CompilerPass):
--> 332 self._runPass(idx, pass_inst, state)
333 else:
334 raise BaseException("Legacy pass in use")
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_lock.py in _acquire_compile_lock(*args, **kwargs)
30 def _acquire_compile_lock(*args, **kwargs):
31 with self:
---> 32 return func(*args, **kwargs)
33 return _acquire_compile_lock
34
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in _runPass(self, index, pss, internal_state)
289 mutated |= check(pss.run_initialization, internal_state)
290 with SimpleTimer() as pass_time:
--> 291 mutated |= check(pss.run_pass, internal_state)
292 with SimpleTimer() as finalize_time:
293 mutated |= check(pss.run_finalizer, internal_state)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in check(func, compiler_state)
262
263 def check(func, compiler_state):
--> 264 mangled = func(compiler_state)
265 if mangled not in (True, False):
266 msg = ("CompilerPass implementations should return True/False. "
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/typed_passes.py in run_pass(self, state)
440
441 # TODO: Pull this out into the pipeline
--> 442 NativeLowering().run_pass(state)
443 lowered = state['cr']
444 signature = typing.signature(state.return_type, *state.args)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/typed_passes.py in run_pass(self, state)
368 lower = lowering.Lower(targetctx, library, fndesc, interp,
369 metadata=metadata)
--> 370 lower.lower()
371 if not flags.no_cpython_wrapper:
372 lower.create_cpython_wrapper(flags.release_gil)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower(self)
177 if self.generator_info is None:
178 self.genlower = None
--> 179 self.lower_normal_function(self.fndesc)
180 else:
181 self.genlower = self.GeneratorLower(self)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_normal_function(self, fndesc)
225 # Init argument values
226 self.extract_function_arguments()
--> 227 entry_block_tail = self.lower_function_body()
228
229 # Close tail of entry block
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_function_body(self)
251 bb = self.blkmap[offset]
252 self.builder.position_at_end(bb)
--> 253 self.lower_block(block)
254 self.post_lower()
255 return entry_block_tail
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_block(self, block)
265 with new_error_context('lowering "{inst}" at {loc}', inst=inst,
266 loc=self.loc, errcls_=defaulterrcls):
--> 267 self.lower_inst(inst)
268 self.post_block(block)
269
~/anaconda3/envs/rankfm/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
128 value = type()
129 try:
--> 130 self.gen.throw(type, value, traceback)
131 except StopIteration as exc:
132 # Suppress StopIteration *unless* it's the same exception that
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/errors.py in new_error_context(fmt_, *args, **kwargs)
806 newerr = errcls(e).add_context(_format_msg(fmt_, args, kwargs))
807 tb = sys.exc_info()[2] if numba.core.config.FULL_TRACEBACKS else None
--> 808 reraise(type(newerr), newerr, tb)
809
810
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/utils.py in reraise(tp, value, tb)
79 if value.__traceback__ is not tb:
80 raise value.with_traceback(tb)
---> 81 raise value
82
83
LoweringError: Failed in nopython mode pipeline (step: nopython mode backend)
File "../../rankfm/numba_methods.py", line 130:
def _fit(loss, max_samples, interactions, sample_weight, user_items, item_idx, regularization, learning_rate, learning_schedule, learning_exponent, epochs, verbose, x_uf, x_if, w_i, w_if, v_u, v_i, v_uf, v_if):
<source elided>
for row in prange(n_interaction):
^
[1] During: lowering "id=31[LoopNest(index_variable = parfor_index.1283, range = (0, interactions_size0.849, 1))]{2610: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (203)>, 2612: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (203)>, 2640: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (206)>, 2652: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (206)>, 2714: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (209)>, 2716: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (209)>, 2774: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (211)>, 232: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (130)>, 236: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (142)>, 266: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (142)>, 2830: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (211)>, 1916: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (161)>, 1918: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (165)>, 2436: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (200)>, 2448: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (200)>, 1946: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (166)>, 1958: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (166)>, 2504: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (201)>, 2506: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (201)>, 2014: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (169)>, 494: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (146)>, 502: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (148)>}Var(parfor_index.1283, numba_methods.py:130)" at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (130)