Is there any way to make custom collector work withing aiohttp application (or any other way to convert json object to prometheus metrics in aiohttp application)?
I'm trying to implement custom collector according to
https://github.com/prometheus/client_python#custom-collectors and fetching data via
`await session.get(...)`. The only solution I found is to change
`def collect()` method to
`async def collect()` in custom collector class and override registry methods with sync loops to async one and also re-implement
`generate_latest` function.
the overrides:```
class CustomRegistry(CollectorRegistry):
async def collect(self):
"""Yields metrics from the collectors in the registry."""
collectors = None
ti = None
with self._lock:
collectors = copy.copy(self._collector_to_names)
if self._target_info:
ti = self._target_info_metric()
if ti:
yield ti
for collector in collectors:
async for metric in collector.collect():
yield metric
async def register(self, collector):
"""Add a collector to the registry."""
with self._lock:
names = await self._get_names(collector)
duplicates = set(self._names_to_collectors).intersection(names)
if duplicates:
raise ValueError(
'Duplicated timeseries in CollectorRegistry: {0}'.format(
duplicates))
for name in names:
self._names_to_collectors[name] = collector
self._collector_to_names[collector] = names
async def _get_names(self, collector):
"""Get names of timeseries the collector produces."""
desc_func = None
# If there's a describe function, use it.
try:
desc_func = collector.describe
except AttributeError:
pass
# Otherwise, if auto describe is enabled use the collect function.
if not desc_func and self._auto_describe:
desc_func = collector.collect
if not desc_func:
return []
result = []
type_suffixes = {
'counter': ['_total', '_created'],
'summary': ['', '_sum', '_count', '_created'],
'histogram': ['_bucket', '_sum', '_count', '_created'],
'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
'info': ['_info'],
}
async for metric in desc_func():
for suffix in type_suffixes.get(metric.type, ['']):
result.append(
metric.name + suffix)
return result
def sample_line(line):
if line.labels:
labelstr = '{{{0}}}'.format(','.join(
['{0}="{1}"'.format(
k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
for k, v in sorted(line.labels.items())]))
else:
labelstr = ''
timestamp = ''
if line.timestamp is not None:
# Convert to milliseconds.
timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
return '{0}{1} {2}{3}\n'.format(
line.name, labelstr, floatToGoString(line.value), timestamp)
async def generate_latest(registry):
"""Returns the metrics from the registry in latest text format as a string."""
output = []
async for metric in registry.collect():
try:
mname =
metric.name mtype = metric.type
# Munging from OpenMetrics into Prometheus format.
if mtype == 'counter':
mname = mname + '_total'
elif mtype == 'info':
mname = mname + '_info'
mtype = 'gauge'
elif mtype == 'stateset':
mtype = 'gauge'
elif mtype == 'gaugehistogram':
# A gauge histogram is really a gauge,
# but this captures the structure better.
mtype = 'histogram'
elif mtype == 'unknown':
mtype = 'untyped'
output.append('# HELP {0} {1}\n'.format(
mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append('# TYPE {0} {1}\n'.format(mname, mtype))
om_samples = {}
for s in metric.samples:
for suffix in ['_created', '_gsum', '_gcount']:
if
s.name ==
metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
om_samples.setdefault(suffix, []).append(sample_line(s))
break
else:
output.append(sample_line(s))
except Exception as exception:
exception.args = (exception.args or ('',)) + (metric,)
raise
for suffix, lines in sorted(om_samples.items()):
output.append('# HELP {0}{1} {2}\n'.format(
metric.name, suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append('# TYPE {0}{1} gauge\n'.format(
metric.name, suffix))
output.extend(lines)
return ''.join(output).encode('utf-8')
```
the custom collector class:```
class CustomCollector(object):
def __init__(self, app):
self.session = app['session']
self.metrics_url = app['metrics_url']
async def collect(self):
resp = await self.session.get(self.metrics_url)
payload = await resp.json()
for name, v in payload['metrics'].items():
name = name.replace('.', '_')
name = name.replace('-', '_')
name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
kind = v['kind']
if kind == 'Gauge':
m = GaugeMetricFamily(name, name, labels=[])
if kind == 'Timer':
m = SummaryMetricFamily(name, name, labels=[])
if kind == 'Counter':
m = CounterMetricFamily(name, name, labels=[])
for i in v['values']:
tags = i['tags']
labels = {tag['key']: tag['value'] for tag in tags}
value = i['values'][0]['v']
timestamp = i['values'][0]['t']
m.add_sample(name, labels, value, timestamp=timestamp)
yield m
```