Parallelise with thread-local reduction variable?

105 views
Skip to first unread message

Buzz B

unread,
Oct 13, 2023, 3:06:10 PM10/13/23
to cython-users
Hello,

Apologies for the verbose message, but I've been working on this problem for days and I am at my limit of understanding.

Task
Essentially, I am trying to write a filtering function in cython as the python equivalent (below) is not viable due to the time-cost given the immense number of rows (over 100,000,000 rows and then multiply that by the number of datasets) that I'm working with:

df.query(
    f"{Z1} < Z < {Z2} and {T1} < T < {T2} and {M1} < M < {M2}",
    engine="numexpr",
)[["T", "I", "M"]]


I have a working cython equivalent:
import numpy as np
import pandas as pd
cimport cython
cimport numpy as cnp
cnp.import_array()

ctypedef cnp.npy_uint32 uint32_t

cpdef filtering(
    uint32_t[::1] I,
    double[::1] Z,
    double[::1] M,
    double[::1] T,
    double Z1,
    double Z2,
    double T1,
    double T2,
    double M1,
    double M2
):

    cdef double[::1] T_out = np.empty_like(T)
    cdef uint32_t[::1] I_out = np.empty_like(I)
    cdef double[::1] M_out = np.empty_like(M)

    cdef Py_ssize_t i, n, out_idx
    cdef cython.bint in_range = 0
    out_idx = 0
    n = Z.shape[0]

    for i in range(n):
        in_range = (
            (Z1 < Z[i]) & (Z[i] < Z2) &
            (T1 < T[i]) & (T[i] < T2) &
            (M1 < M[i]) & (M[i] < M2)
        )
        if in_range:
            T_out[out_idx] = T[i]
            I_out[out_idx] = I[i]
            M_out[out_idx] = M[i]
            out_idx += 1
   
    T_out_arr = np.asarray(T_out[:out_idx])
    I_out_arr = np.asarray(I_out[:out_idx])
    M_out_arr = np.asarray(M_out[:out_idx])

    return pd.DataFrame({
        "T": T_out_arr,
        "I": I_out_arr,
        "M": M_out_arr
    })

I have also executed this nested in a for p in range(Z1.shape[0]) loop.

Problem
I'd now like to parallelise it. Since the bottleneck is the querying of the data, I would like to parallelise the inner loop as this contains far more elements than the outer loop will iterate over.

After many iterations that failed to even compile, due to errors including:
  • Cannot read reduction variable in loop body
  • local variable 'out_idx' referenced before assignment
I resorted to parallelising the outer loop and have a version that does compile, but the output is incorrect. Converting the return result to a DataFrame (pd.DataFrame(result, columns=["T", "I", "M"])), I can see that it appears as though the number of elements in each sub-array (the inner loop) are correct, but all the values for each p of the outer loop are the same.

My failed attempt:
import numpy as np
from cython.parallel import prange
cimport cython
cimport numpy as cnp
cnp.import_array()

ctypedef cnp.npy_uint32 uint32_t

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cdef void filter_inner(
    uint32_t[::1] I,
    double[::1] Z,
    double[::1] M,
    double[::1] T,
    double Z1,
    double Z2,
    double T1,
    double T2,
    double M1,
    double M2,
    double[::1] T_out,
    uint32_t[::1] I_out,
    double[::1] M_out,
    object[:, :] output_nested,
    Py_ssize_t n,
    Py_ssize_t p
) nogil:
   
    cdef Py_ssize_t i, out_idx
    cdef cython.bint in_range = 0
    out_idx = 0

    for i in range(n):
        in_range = (
            (Z1 < Z[i]) & (Z[i] < Z2) &
            (T1 < T[i]) & (T[i] < T2) &
            (M1 < M[i]) & (M[i] < M2)
        )
        if in_range:
            T_out[out_idx] = T[i]
            I_out[out_idx] = I[i]
            M_out[out_idx] = M[i]
            out_idx += 1
   
    # I don't know if this hack works, but it allows use of python objects
    # and compiles
    with gil:
        output_nested[p, 0] = T_out[:out_idx]
        output_nested[p, 1] = I_out[:out_idx]
        output_nested[p, 2] = M_out[:out_idx]


@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef filter_outer(
    uint32_t[::1] I,
    double[::1] Z,
    double[::1] M,
    double[::1] T,
    double Z1,
    double Z2,
    double T1,
    double T2,
    double M1,
    double M2
):

    cdef Py_ssize_t p, r, n
    r = Z1.shape[0]
    n = Z.shape[0]
   
    cdef double[::1] T_out = np.empty_like(T)
    cdef uint32_t[::1] I_out = np.empty_like(I)
    cdef double[::1] M_out = np.empty_like(M)
    cdef object[:, :] output_nested = np.empty((r, 3), dtype=object)

    for p in prange(r, nogil=True, schedule="static"):
        filter_inner(
            I,
            Z,
            M,
            T,
            Z1[p],
            Z2[p],
            T1[p],
            T2[p],
            M1[p],
            M2[p],
            T_out,
            I_out,
            M_out,
            output_nested,
            n,
            p
        )

    return np.asarray(output_nested)

I reckon that I might be able to use threadid() to somehow index out_idx as an array and store into thread-local versions of it (although I'm still surprised that I managed to get past the initial compile errors regarding the use of out_idx), but have no idea how to implement it.

I truly appreciate any help offered, and if possible could code be provided showing how to parallelise the inner loop, or at this point I'd be grateful for even a toy example that is applicable.

Thanks again,
Buzz

da-woods

unread,
Oct 15, 2023, 3:44:14 PM10/15/23
to cython...@googlegroups.com
Your threadid idea is probably the way to go. You want to make your output memoryviews 2D with the second dimension being `omp_get_num_threads()` elements long.

The tricky bit (as I think you've found) is stopping Cython identifying `out_idx` as a reduction variable. What I'd do here is make `out_idx` a 1D array of `omp_get_num_threads()` and have each thread increment `out_idx[threadid()]` instead.

There might be other tricks you could use though. I suspect if you hid the `+=` from Cython it would make `out_idx` a regular thread-local variable. So you could make a cdef inline function

```
cdef inline int add_one(int x) noexcept nogil:
    return x+1
```

and replace the `out_idx += 1` with `out_idx = add_one(out_idx)`. I suspect this would work, but I'd want to inspect the C code myself to convince myself that `out_idx` was genuinely thread-local.
--

---
You received this message because you are subscribed to the Google Groups "cython-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cython-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cython-users/f15321cf-936d-4903-a634-47a62268d113n%40googlegroups.com.


Jérôme Kieffer

unread,
Oct 16, 2023, 1:59:26 AM10/16/23
to cython...@googlegroups.com
On Sun, 15 Oct 2023 20:44:07 +0100
da-woods <dw-...@d-woods.co.uk> wrote:

> Your threadid idea is probably the way to go. You want to make your
> output memoryviews 2D with the second dimension being
> `omp_get_num_threads()` elements long.
>
> The tricky bit (as I think you've found) is stopping Cython identifying
> `out_idx` as a reduction variable. What I'd do here is make `out_idx` a
> 1D array of `omp_get_num_threads()` and have each thread increment
> `out_idx[threadid()]` instead.
>
> There might be other tricks you could use though. I suspect if you hid
> the `+=` from Cython it would make `out_idx` a regular thread-local
> variable. So you could make a cdef inline function
>
> ```
> cdef inline int add_one(int x) noexcept nogil:
>     return x+1
> ```
>
> and replace the `out_idx += 1` with `out_idx = add_one(out_idx)`. I
> suspect this would work, but I'd want to inspect the C code myself to
> convince myself that `out_idx` was genuinely thread-local.

I usually do:
`out_idx = out_idx+1`

https://youtu.be/QSlo_Nyzeig?t=881

This has been working for a decade, no idea if "+=" is faster.
Cheers,

--
Jerome

da-woods

unread,
Oct 16, 2023, 3:02:52 AM10/16/23
to cython...@googlegroups.com
On 16/10/2023 06:59, Jérôme Kieffer wrote:
I usually do:
`out_idx = out_idx+1`

https://youtu.be/QSlo_Nyzeig?t=881

This has been working for a decade, no idea if "+=" is faster.
Cheers,

Good to know - that is a simpler suggestion!  (I'd be very surprised if there was any difference in speed once it's been through the C compiler)

Buzz B

unread,
Oct 17, 2023, 11:48:36 PM10/17/23
to cython-users
Thank you all for your responses. I have been iteratively working on implementing and testing them.

@Jerome Kieffer
  • Unfortunately, this did not work for me:
    • `out_idx = out_idx+1`


@D Woods
  • The 2D memoryviews has got me close to the expected output: my prior attempt resulted in the nested arrays having the same values for each `p`, and after your suggestion the arrays for each `p` matched that of the expected output for all apart from 1 iteration - this I can't understand.
    • Could it be that once a given thread is used again (and subsequently indexed) it uses the same values?

    • Expected output:
expected.png
    • 2D memoryview output:
memoryview2D.png

  • Performing a sanity check by changing `prange` to `range`, also requires that `T_out`, `I_out`, and `M_out` are instantiated as empty arrays within the loop, i.e. with each iteration of `p`
    • However, I am unable to implement this in a `nogil` section since the arrays are python objects. I tried to apply the cython documentation to my scenario like so:
```
# create objects
T_out = <double[:, ::1]> malloc(sizeof(double) * n * num_threads)  # did not compile
cdef double *T_out = <double *> malloc(sizeof(double) * n * num_threads)  # used within the cdef inner function
# assign
T_out[out_idx[tid] * num_threads + tid] = T[i]  # no idea if this is correct syntax to achieve intent, but executing caused a fatal error:
#> The Kernel crashed while executing code in the current cell or a previous cell.
```

  • After a brief attempt at creating a 3D memoryview `[n, num_threads, p]` this produced a MemoryError: Unable to allocate, so that is not viable.

Future directions
  • As good practice, I'd like to know how to properly create a memoryview to an array that would work in a `nogil` section and then assign to a slice of it?
  • How can I correctly create the desired output?
  • Failing this, how could I implement `searchsorted()` to perform a binary search instead and take the indices of the sorted arrays (e.g. `T.sort_values()`), intersect them, and then use the remaining indices to get the values of the original arrays?

I appreciate your help,
Buzz

Jérôme Kieffer

unread,
Oct 18, 2023, 12:54:50 PM10/18/23
to cython...@googlegroups.com
On Tue, 17 Oct 2023 15:57:46 -0700 (PDT)
Buzz B <Buzz.B...@outlook.com> wrote:

> Thank you all for your responses. I have been iteratively working on
> implementing and testing them.
>
> *@Jerome Kieffer*
>
> - Unfortunately, this did not work for me:
> - `out_idx = out_idx+1`

Hi, I had a better look at your code and it cannot work they way you want ...

You are calculating the output position in the loop and this is a problem

You have 2 options:
- allocate one output slab per thread, apparently you tried and you are
lacking memory because you have too many threads.
- One can use atomic add but this is not available in cython. I use
preferably opencl when I encounter this kind of situations...
Interfaced with pyopencl, the code is not that much more complex than cython.

Cheers,

Jerome
Reply all
Reply to author
Forward
0 new messages