I know it's been over a year, and if you could provide some insight on how you figured it out, please let us know.
I'm facing the same thing for the company I'm working for, we want to gather metrics from within Celery tasks such as the timing, error count, etc.We are planning on using the pushgateway for gather metrics from the celery tasks since they are short-lived compared to a daemon running.To keep the key-value pairs separate, we are gonna add labels on the metric defining the name of the job.I think in the pushadd_to_gateway() method provided by the prometheus_client, there is a job='' keyword argument that we can add. Perhaps this will prevent from Celery workers overwriting each other's metrics?
from functools import lru_cache
from billiard import current_process
from prometheus_client import Counter, CollectorRegistry, push_to_gateway
registry = CollectorRegistry() # if you also have batch Celery tasks, use a separate registry for those
# each metric should be defined referring to the above registry, like this:
count_retries = Counter(
'retries', 'Processing retries', ['task'], registry=registry,
)
@lru_cache(maxsize=2)
def worker_id():
"""
current_process() doesn't have an "index" attribute at compile time;
use a function to return it at runtime, and cache the response since
it doesn't change for a given worker
"""
try:
index = current_process().index
except AttributeError:
# For management commands, which also don't have "index" defined
index = 0
return 'worker-%d' % index
def push_stats():
"""
Call this function at the end of each Celery task;
if you have batch tasks, have another function to push those to the batch registry
"""
push_to_gateway('localhost:9091', job=worker_id(), registry=registry) # push our stats identified by our worker_id