Hello,
Can I impose a memory limit for a task being executed by green thread,
such that if in case , it goes beyond a limit I raise an exception from within a task to stop it.
I do not want to restart/revoke/kill workers(greenthreads).
I know it is complicated to know the memory used by each greenthread individually.
I have a sample code which helps me know how much memory is consumed by a thread at every log when two green threads perform the same task, how can I achieve the same with celery:
from gevent import monkey,
Greenlet, sleep
monkey.patch_all()
import os, psutil
from datetime import datetime
global start_memory
memory_usage = dict()
start_memory = 0
def log(msg):
print("{}:{}".format(datetime.now(),msg))
def log_memory(task_id):
task_mem_usage = memory_usage[task_id]+ (get_memory() - start_memory)
log("memory used by {} is {} MiB".format(task_id, task_mem_usage))
def get_memory():
return psutil.Process(os.getpid()).memory_info().rss / (1024 ** 2)
def taskFunction(task_id, list_t: list=[]):
log_memory(task_id)
list_t.append(1)
size_in_mb = 100
size_in_bytes = size_in_mb * 1024 * 1024
# Create a list to allocate memory
data = [0] * (size_in_bytes // 8)
log("task_id: {} Process Memory: {}".format(task_id, get_memory()))
sleep(5)
log_memory(task_id)
return data
def wrapper_function(task_id, list_t):
taskFunction(task_id, list_t)
def call_back_func(event, args):
global start_memory
origin, target = args
if event == "switch":
if isinstance(origin, Greenlet):
if callable(origin._run) and repr(origin._run)==repr(wrapper_function):
task_id = origin.args[0]
memory_usage[task_id] = get_memory() - start_memory + memory_usage[task_id]
if isinstance(target, Greenlet):
if callable(target._run) and repr(target._run)==repr(wrapper_function):
task_id = target.args[0]
start_memory = get_memory()
if not task_id in memory_usage:
memory_usage[task_id] = 0
if __name__ == '__main__':
list_t=[]
g = Greenlet(wrapper_function, "1", list_t)
g.start()
trace=g.gettrace()
g2 = Greenlet(wrapper_function, "2", list_t)
g.settrace(call_back_func)
g2.start()
g.join()
g2.join()
Thanks in Advance !!!