Ability to Implement Non-Thread-Safe PRANGE() Functions?

11 views
Skip to first unread message

Eric Lundquist

unread,
May 30, 2020, 10:09:19 PM5/30/20
to Numba Public Discussion - Public
Hi,

I'm currently using Numba==0.49.1 for a collaborative filtering factorization machine library I've written: https://github.com/etlundquist/rankfm. 

I love it so far - it required very little syntax change and provided approximately a 50X speed-up over my original pure python code (and I didn't have to dive into Cython)! The last major improvement I'd love to be able to implement is something called HOGWILD SGD:

which essentially is a very simple, non-thread-safe, parallelized variant of SGD.
Parallel processes all have access to the model weights in shared memory, process randomly sampled training examples, and apply weight updates without locking/sync.
Race conditions/weird results are possible, but if each training example is quite sparse and only updates a small fraction of the model weights
(as is the case with factorization machine models where there are typically tens of thousands of weights with only a handful being updated by each training sample)
the authors report very little numerical error and near-linear parallel performance scaling. 

Basically all I want to do is apply the Numba PRANGE() function to the main training sample loop: 

I've tried this with all the available threading layers (tbb, omp, workqueue) and while I understand all the NumbaParallelSafetyWarning that get generated, the code breaks because of some assertion errors (which I'll paste below).
Being pretty unfamiliar with both Numba and low-level parallel processing, I thought it would be a good idea to ask:

1. Is non-thread-safe parallel processing even possible with Numba? Is there any way to do this with some sort of "informed consent" of the program author via options/overrides?
2. If #1 is possible, how would I go about implementing this - what am I missing that's causing this error?

I'd greatly appreciate any help/advice someone far more experienced than myself could offer :). Also if this isn't the right place for questions like this, sorry, and please let me know where I should post!

All the best, Eric

---


================================================================================
======== Parallel Accelerator Optimizing: Internal parallel functions  =========
================================================================================
No source available
Performing sequential lowering of loops...
--------------------------------------------------------------------------------
----------------------------- Before Optimisation ------------------------------
--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel structure is already optimal.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
 
---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found

Instruction hoisting:
No instruction hoisting found
--------------------------------------------------------------------------------
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/errors.py in new_error_context(fmt_, *args, **kwargs)
    800     try:
--> 801         yield
    802     except NumbaError as e:

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_block(self, block)
    266                                    loc=self.loc, errcls_=defaulterrcls):
--> 267                 self.lower_inst(inst)
    268         self.post_block(block)

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_inst(self, inst)
    479                 if isinstance(inst, _class):
--> 480                     func(self, inst)
    481                     return

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in _lower_parfor_parallel(lowerer, parfor)
    239             lowerer, parfor, typemap, typingctx, targetctx, flags, {},
--> 240             bool(alias_map), index_var_typ, parfor.races)
    241     finally:

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in _create_gufunc_for_parfor_body(lowerer, parfor, typemap, typingctx, targetctx, flags, locals, has_aliases, index_var_typ, races)
    933         warnings.warn(NumbaParallelSafetyWarning(msg, loc))
--> 934     replace_var_with_array(races, loop_body, typemap, lowerer.fndesc.calltypes)
    935 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array(vars, loop_body, typemap, calltypes)
   1363 def replace_var_with_array(vars, loop_body, typemap, calltypes):
-> 1364     replace_var_with_array_internal(vars, loop_body, typemap, calltypes)
   1365     for v in vars:

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array_internal(vars, loop_body, typemap, calltypes)
   1360     for label, block in loop_body.items():
-> 1361         block.body = replace_var_with_array_in_block(vars, block, typemap, calltypes)
   1362 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/parfors/parfor_lowering.py in replace_var_with_array_in_block(vars, block, typemap, calltypes)
   1346 
-> 1347             setitem_node = ir.SetItem(inst.target, const_var, inst.value, inst.loc)
   1348             calltypes[setitem_node] = signature(

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/ir.py in __init__(self, target, index, value, loc)
    590         assert isinstance(index, Var)
--> 591         assert isinstance(value, Var)
    592         assert isinstance(loc, Loc)

AssertionError: 

During handling of the above exception, another exception occurred:

LoweringError                             Traceback (most recent call last)
<ipython-input-56-bf2ee917ede8> in <module>
----> 1 model.fit(interactions_train, sample_weight=sample_weight_train, epochs=5, verbose=True)

~/Repos/rankfm/rankfm/rankfm.py in fit(self, interactions, user_features, item_features, sample_weight, epochs, verbose)
    261 
    262         self._reset_state()
--> 263         self.fit_partial(interactions, user_features, item_features, sample_weight, epochs, verbose)
    264 
    265 

~/Repos/rankfm/rankfm/rankfm.py in fit_partial(self, interactions, user_features, item_features, sample_weight, epochs, verbose)
    302             self.v_i,
    303             self.v_uf,
--> 304             self.v_if
    305         )
    306         self.w_i, self.w_if, self.v_u, self.v_i, self.v_uf, self.v_if = updated_weights

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_for_args(self, *args, **kws)
    418                     e.patch_message('\n'.join((str(e).rstrip(), help_msg)))
    419             # ignore the FULL_TRACEBACKS config, this needs reporting!
--> 420             raise e
    421 
    422     def inspect_llvm(self, signature=None):

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_for_args(self, *args, **kws)
    351                 argtypes.append(self.typeof_pyval(a))
    352         try:
--> 353             return self.compile(tuple(argtypes))
    354         except errors.ForceLiteralArg as e:
    355             # Received request for compiler re-entry with the list of arguments

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_lock.py in _acquire_compile_lock(*args, **kwargs)
     30         def _acquire_compile_lock(*args, **kwargs):
     31             with self:
---> 32                 return func(*args, **kwargs)
     33         return _acquire_compile_lock
     34 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in compile(self, sig)
    792             self._cache_misses[sig] += 1
    793             try:
--> 794                 cres = self._compiler.compile(args, return_type)
    795             except errors.ForceLiteralArg as e:
    796                 def folded(args, kws):

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in compile(self, args, return_type)
     75 
     76     def compile(self, args, return_type):
---> 77         status, retval = self._compile_cached(args, return_type)
     78         if status:
     79             return retval

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_cached(self, args, return_type)
     89 
     90         try:
---> 91             retval = self._compile_core(args, return_type)
     92         except errors.TypingError as e:
     93             self._failed_cache[key] = e

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/dispatcher.py in _compile_core(self, args, return_type)
    107                                       args=args, return_type=return_type,
    108                                       flags=flags, locals=self.locals,
--> 109                                       pipeline_class=self.pipeline_class)
    110         # Check typing error if object mode is used
    111         if cres.typing_error is not None and not flags.enable_pyobject:

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
    566     pipeline = pipeline_class(typingctx, targetctx, library,
    567                               args, return_type, flags, locals)
--> 568     return pipeline.compile_extra(func)
    569 
    570 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in compile_extra(self, func)
    337         self.state.lifted = ()
    338         self.state.lifted_from = None
--> 339         return self._compile_bytecode()
    340 
    341     def compile_ir(self, func_ir, lifted=(), lifted_from=None):

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_bytecode(self)
    399         """
    400         assert self.state.func_ir is None
--> 401         return self._compile_core()
    402 
    403     def _compile_ir(self):

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_core(self)
    379                 self.state.status.fail_reason = e
    380                 if is_final_pipeline:
--> 381                     raise e
    382         else:
    383             raise CompilerError("All available pipelines exhausted")

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler.py in _compile_core(self)
    370             res = None
    371             try:
--> 372                 pm.run(self.state)
    373                 if self.state.cr is not None:
    374                     break

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in run(self, state)
    339                     (self.pipeline_name, pass_desc)
    340                 patched_exception = self._patch_error(msg, e)
--> 341                 raise patched_exception
    342 
    343     def dependency_analysis(self):

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in run(self, state)
    330                 pass_inst = _pass_registry.get(pss).pass_inst
    331                 if isinstance(pass_inst, CompilerPass):
--> 332                     self._runPass(idx, pass_inst, state)
    333                 else:
    334                     raise BaseException("Legacy pass in use")

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_lock.py in _acquire_compile_lock(*args, **kwargs)
     30         def _acquire_compile_lock(*args, **kwargs):
     31             with self:
---> 32                 return func(*args, **kwargs)
     33         return _acquire_compile_lock
     34 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in _runPass(self, index, pss, internal_state)
    289             mutated |= check(pss.run_initialization, internal_state)
    290         with SimpleTimer() as pass_time:
--> 291             mutated |= check(pss.run_pass, internal_state)
    292         with SimpleTimer() as finalize_time:
    293             mutated |= check(pss.run_finalizer, internal_state)

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/compiler_machinery.py in check(func, compiler_state)
    262 
    263         def check(func, compiler_state):
--> 264             mangled = func(compiler_state)
    265             if mangled not in (True, False):
    266                 msg = ("CompilerPass implementations should return True/False. "

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/typed_passes.py in run_pass(self, state)
    440 
    441         # TODO: Pull this out into the pipeline
--> 442         NativeLowering().run_pass(state)
    443         lowered = state['cr']
    444         signature = typing.signature(state.return_type, *state.args)

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/typed_passes.py in run_pass(self, state)
    368                 lower = lowering.Lower(targetctx, library, fndesc, interp,
    369                                        metadata=metadata)
--> 370                 lower.lower()
    371                 if not flags.no_cpython_wrapper:
    372                     lower.create_cpython_wrapper(flags.release_gil)

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower(self)
    177         if self.generator_info is None:
    178             self.genlower = None
--> 179             self.lower_normal_function(self.fndesc)
    180         else:
    181             self.genlower = self.GeneratorLower(self)

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_normal_function(self, fndesc)
    225         # Init argument values
    226         self.extract_function_arguments()
--> 227         entry_block_tail = self.lower_function_body()
    228 
    229         # Close tail of entry block

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_function_body(self)
    251             bb = self.blkmap[offset]
    252             self.builder.position_at_end(bb)
--> 253             self.lower_block(block)
    254         self.post_lower()
    255         return entry_block_tail

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/lowering.py in lower_block(self, block)
    265             with new_error_context('lowering "{inst}" at {loc}', inst=inst,
    266                                    loc=self.loc, errcls_=defaulterrcls):
--> 267                 self.lower_inst(inst)
    268         self.post_block(block)
    269 

~/anaconda3/envs/rankfm/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
    128                 value = type()
    129             try:
--> 130                 self.gen.throw(type, value, traceback)
    131             except StopIteration as exc:
    132                 # Suppress StopIteration *unless* it's the same exception that

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/errors.py in new_error_context(fmt_, *args, **kwargs)
    806         newerr = errcls(e).add_context(_format_msg(fmt_, args, kwargs))
    807         tb = sys.exc_info()[2] if numba.core.config.FULL_TRACEBACKS else None
--> 808         reraise(type(newerr), newerr, tb)
    809 
    810 

~/anaconda3/envs/rankfm/lib/python3.7/site-packages/numba/core/utils.py in reraise(tp, value, tb)
     79     if value.__traceback__ is not tb:
     80         raise value.with_traceback(tb)
---> 81     raise value
     82 
     83 

LoweringError: Failed in nopython mode pipeline (step: nopython mode backend)


File "../../rankfm/numba_methods.py", line 130:
def _fit(loss, max_samples, interactions, sample_weight, user_items, item_idx, regularization, learning_rate, learning_schedule, learning_exponent, epochs, verbose, x_uf, x_if, w_i, w_if, v_u, v_i, v_uf, v_if):
    <source elided>

        for row in prange(n_interaction):
        ^

[1] During: lowering "id=31[LoopNest(index_variable = parfor_index.1283, range = (0, interactions_size0.849, 1))]{2610: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (203)>, 2612: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (203)>, 2640: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (206)>, 2652: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (206)>, 2714: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (209)>, 2716: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (209)>, 2774: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (211)>, 232: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (130)>, 236: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (142)>, 266: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (142)>, 2830: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (211)>, 1916: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (161)>, 1918: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (165)>, 2436: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (200)>, 2448: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (200)>, 1946: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (166)>, 1958: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (166)>, 2504: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (201)>, 2506: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (201)>, 2014: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (169)>, 494: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (146)>, 502: <ir.Block at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (148)>}Var(parfor_index.1283, numba_methods.py:130)" at /Users/ericlundquist/Repos/rankfm/rankfm/numba_methods.py (130)
Reply all
Reply to author
Forward
0 new messages