I need to call Cython code at a very high rate in Windows, with low jitter. Using regular threads and delays doesn't work, for reasons I won't go into here. I've had good luck so far with the Windows multimedia timer using Ctypes, but I can't get the same thing working in Cython.
This code works:
# #####################################
# ctypes_timer.py:
from ctypes import WINFUNCTYPE, windll, c_uint, c_ulong
from ctypes.wintypes import UINT, DWORD
timeproc = WINFUNCTYPE(None, c_uint, c_uint, DWORD, DWORD, DWORD)
timeSetEvent = windll.winmm.timeSetEvent
timeKillEvent = windll.winmm.timeKillEvent
class HighPrecisionPeriodicTimer:
def __init__(self, interval, callback, resolution=0):
self.__uDelay = UINT(interval)
self.__uResolution = UINT(resolution)
self.__dwUser = c_ulong(0)
self.__fuEvent = c_uint(True)
self.__id = None
self.__callback_func = timeproc(self.__callback)
self._is_running = False
self.interval = interval
self.callback = callback
def __del__(self):
try:
self.stop()
except:
pass
def start(self):
if not self._is_running:
self._is_running = True
self.__id = timeSetEvent(self.__uDelay,
self.__uResolution,
self.__callback_func,
self.__dwUser,
self.__fuEvent)
def stop(self):
if self._is_running:
timeKillEvent(self.__id)
self._is_running = False
def __callback(self, uTimerID, uMsg, dwUser, dw1, dw2):
if self._is_running:
self.callback()
-----------------------------------
This code doesn't work, I assume I've done something wrong:
# #####################################
# setup.py:
from setuptools import setup
from setuptools.extension import Extension
from Cython.Build import cythonize
from Cython.Distutils import build_ext
cython_timer = Extension(name='cython_timer', sources=["./cython_timer.pyx"], libraries=["Winmm"])
setup(name='cython_timer',
ext_modules=cythonize([cython_timer]),
cmdclass={'build_ext': build_ext},
zip_safe=False)
# #####################################
# cython_timer.pyx:
ctypedef uint32_t MMRESULT
ctypedef uint32_t UINT
ctypedef uint32_t* DWORD_PTR
cdef extern from "Windows.h":
ctypedef void ( __stdcall *LPTIMECALLBACK)(
UINT uTimerID,
UINT uMsg,
DWORD_PTR dwUser,
DWORD_PTR dw1,
DWORD_PTR dw2
)
cdef MMRESULT timeSetEvent(
UINT uDelay,
UINT uResolution,
LPTIMECALLBACK lpTimeProc,
DWORD_PTR dwUser,
UINT fuEvent
)
cdef MMRESULT timeKillEvent(
UINT uTimerID
)
cdef class HighPrecisionPeriodicTimer:
cdef bint _is_running
cdef uint32_t _interval
cdef uint32_t _resolution
cdef uint32_t _id
cdef uint32_t* _dwUser
cdef object _callback
def __init__(self, interval, callback, resolution=0):
if not callable(callback):
raise TypeError('callback must be callable')
self._is_running = False
self._interval = interval
self._resolution = resolution
self._callback = callback
def __del__(self):
try:
self._stop()
except:
pass
cdef _start(self):
if not self._is_running:
self._id = timeSetEvent(self._interval, # uDelay
self._resolution, # uResolution
<LPTIMECALLBACK>self._callback_wrapper, # callback_func
self._dwUser, # dwUser
1) # fuEvent, TIME_PERIODIC=1
self._is_running = True
cdef _stop(self):
if self._is_running:
timeKillEvent(self._id)
self._is_running = False
def start(self):
self._start()
def stop(self):
self._stop()
cdef public void _callback_wrapper(self, uint32_t uTimerID, uint32_t uMsg, uint32_t* dwUser, uint32_t* dw1, uint32_t* dw2):
if self._is_running:
self._callback()
# #####################################
# test_timer.py:
import time
from ctypes_timer import HighPrecisionPeriodicTimer
# from cython_timer import HighPrecisionPeriodicTimer
def c():
print(time.perf_counter())
t = HighPrecisionPeriodicTimer(interval=10, callback=c)
t.start()
time.sleep(1)
t.stop()
The Ctypes version works fine. When I run the version I tried to convert to Cython, it crashes when start() is called. I'm sure it's something that I'm not doing right, but I'm stuck at this point. What am I missing?