custom collector in aiohttp application

190 views
Skip to first unread message

Alexander

unread,
Oct 11, 2020, 6:14:34 PM10/11/20
to Prometheus Users
Is there any way to make custom collector work withing aiohttp application (or any other way to convert json object to prometheus metrics in aiohttp application)?

I'm trying to implement custom collector according to https://github.com/prometheus/client_python#custom-collectors and fetching data via `await session.get(...)`. The only solution I found is to change `def collect()` method to `async def collect()` in custom collector class and override registry methods with sync loops to async one and also re-implement `generate_latest` function.

the overrides:
```
class CustomRegistry(CollectorRegistry):
    async def collect(self):
        """Yields metrics from the collectors in the registry."""
        collectors = None
        ti = None
        with self._lock:
            collectors = copy.copy(self._collector_to_names)
            if self._target_info:
                ti = self._target_info_metric()
        if ti:
            yield ti
        for collector in collectors:
            async for metric in collector.collect():
                yield metric

    async def register(self, collector):
        """Add a collector to the registry."""
        with self._lock:
            names = await self._get_names(collector)
            duplicates = set(self._names_to_collectors).intersection(names)
            if duplicates:
                raise ValueError(
                    'Duplicated timeseries in CollectorRegistry: {0}'.format(
                        duplicates))
            for name in names:
                self._names_to_collectors[name] = collector
            self._collector_to_names[collector] = names

    async def _get_names(self, collector):
        """Get names of timeseries the collector produces."""
        desc_func = None
        # If there's a describe function, use it.
        try:
            desc_func = collector.describe
        except AttributeError:
            pass
        # Otherwise, if auto describe is enabled use the collect function.
        if not desc_func and self._auto_describe:
            desc_func = collector.collect

        if not desc_func:
            return []

        result = []
        type_suffixes = {
            'counter': ['_total', '_created'],
            'summary': ['', '_sum', '_count', '_created'],
            'histogram': ['_bucket', '_sum', '_count', '_created'],
            'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
            'info': ['_info'],
        }
        async for metric in desc_func():
            for suffix in type_suffixes.get(metric.type, ['']):
                result.append(metric.name + suffix)
        return result


def sample_line(line):
    if line.labels:
        labelstr = '{{{0}}}'.format(','.join(
            ['{0}="{1}"'.format(
                k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
                for k, v in sorted(line.labels.items())]))
    else:
        labelstr = ''
    timestamp = ''
    if line.timestamp is not None:
        # Convert to milliseconds.
        timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
    return '{0}{1} {2}{3}\n'.format(
        line.name, labelstr, floatToGoString(line.value), timestamp)


async def generate_latest(registry):
    """Returns the metrics from the registry in latest text format as a string."""
    output = []
    async for metric in registry.collect():
        try:
            mname = metric.name
            mtype = metric.type
            # Munging from OpenMetrics into Prometheus format.
            if mtype == 'counter':
                mname = mname + '_total'
            elif mtype == 'info':
                mname = mname + '_info'
                mtype = 'gauge'
            elif mtype == 'stateset':
                mtype = 'gauge'
            elif mtype == 'gaugehistogram':
                # A gauge histogram is really a gauge,
                # but this captures the structure better.
                mtype = 'histogram'
            elif mtype == 'unknown':
                mtype = 'untyped'

            output.append('# HELP {0} {1}\n'.format(
                mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0} {1}\n'.format(mname, mtype))

            om_samples = {}
            for s in metric.samples:
                for suffix in ['_created', '_gsum', '_gcount']:
                    if s.name == metric.name + suffix:
                        # OpenMetrics specific sample, put in a gauge at the end.
                        om_samples.setdefault(suffix, []).append(sample_line(s))
                        break
                else:
                    output.append(sample_line(s))
        except Exception as exception:
            exception.args = (exception.args or ('',)) + (metric,)
            raise

        for suffix, lines in sorted(om_samples.items()):
            output.append('# HELP {0}{1} {2}\n'.format(metric.name, suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0}{1} gauge\n'.format(metric.name, suffix))
            output.extend(lines)
    return ''.join(output).encode('utf-8')
```

the custom collector class:
```
class CustomCollector(object):
    def __init__(self, app):
        self.session = app['session']
        self.metrics_url = app['metrics_url']

    async def collect(self):
        resp = await self.session.get(self.metrics_url)
        payload = await resp.json()
        for name, v in payload['metrics'].items():
            name = name.replace('.', '_')
            name = name.replace('-', '_')
            name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
            kind = v['kind']
            if kind == 'Gauge':
                m = GaugeMetricFamily(name, name, labels=[])
            if kind == 'Timer':
                m = SummaryMetricFamily(name, name, labels=[])
            if kind == 'Counter':
                m = CounterMetricFamily(name, name, labels=[])
            for i in v['values']:
                tags = i['tags']
                labels = {tag['key']: tag['value'] for tag in tags}
                value = i['values'][0]['v']
                timestamp = i['values'][0]['t']
                m.add_sample(name, labels, value, timestamp=timestamp)
            yield m
```

Stéphane

unread,
Jan 31, 2024, 9:48:50 AM1/31/24
to Prometheus Users
Hi,

I'm interested in this question.
Do you have any experience to share?

I'm currently using aioprometheus, but the metric exposure is blocking:
https://github.com/claws/aioprometheus/issues/98

Looking at the official client, it seems that they added async ASGI metric exposure. But I don't know if their metrics update will then be blocking.
I didn't try it yet.

And there is also this prometheus-async, I asked a question there too:
Reply all
Reply to author
Forward
0 new messages