Enabling tracing and setting callback in task defintion for gevent pool

29 views
Skip to first unread message

karthik jain

unread,
Nov 24, 2023, 7:55:03 AM11/24/23
to celery-users
Hello,
Can I impose a memory limit for a task being executed by green thread,
such that if in case , it goes beyond a limit I raise an exception from within a task to stop it.
I do not want to restart/revoke/kill workers(greenthreads).
I know it is complicated to know the memory used by each greenthread individually.
I have a sample code which helps me know how much memory is consumed by a thread at every log when two green threads perform the same task, how can I achieve the same with celery:

from gevent import monkey, Greenlet, sleep
monkey.patch_all()
import os, psutil
from datetime import datetime

global start_memory
memory_usage = dict()
start_memory = 0

def log(msg):
    print("{}:{}".format(datetime.now(),msg))

def log_memory(task_id):
    task_mem_usage = memory_usage[task_id]+ (get_memory() - start_memory)
    log("memory used by {} is {} MiB".format(task_id, task_mem_usage))

def get_memory():
    return psutil.Process(os.getpid()).memory_info().rss / (1024 ** 2)

def taskFunction(task_id, list_t: list=[]):
    log_memory(task_id)
    list_t.append(1)
    size_in_mb = 100
    size_in_bytes = size_in_mb * 1024 * 1024
    # Create a list to allocate memory
    data = [0] * (size_in_bytes // 8)
    log("task_id: {} Process Memory: {}".format(task_id, get_memory()))
    sleep(5)
    log_memory(task_id)
    return data

def wrapper_function(task_id, list_t):
     taskFunction(task_id, list_t)

def call_back_func(event, args):
    global start_memory
    origin, target = args
    if event == "switch":
        if isinstance(origin, Greenlet):
            if callable(origin._run) and repr(origin._run)==repr(wrapper_function):
                task_id = origin.args[0]
                memory_usage[task_id] = get_memory() - start_memory + memory_usage[task_id]
        if isinstance(target, Greenlet):
            if callable(target._run) and repr(target._run)==repr(wrapper_function):
                task_id = target.args[0]
                start_memory = get_memory()
                if not task_id in memory_usage:
                    memory_usage[task_id] = 0

if __name__ == '__main__':
    list_t=[]
    g = Greenlet(wrapper_function, "1", list_t)
    g.start()
    trace=g.gettrace()
    g2 = Greenlet(wrapper_function, "2", list_t)
    g.settrace(call_back_func)
    g2.start()
    g.join()
    g2.join()

Thanks in Advance !!!
Reply all
Reply to author
Forward
0 new messages