import random
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
class MetricsCollector:
def __init__(self, endpoint):
self.registry = CollectorRegistry()
self.gauges = {}
self.endpoint = endpoint
def add_metric(self, name):
gauge = Gauge(name, 'Description of the metric', registry=self.registry)
self.gauges[name] = gauge
def set_metric_value(self, name, value):
gauge = self.gauges.get(name)
if gauge is not None:
gauge.set(value)
def collect_metrics(self):
while True:
for name, gauge in self.gauges.items():
# Generate random value using random library
value = random.random()
self.set_metric_value(name, value)
# Push metrics to the Prometheus endpoint
push_to_gateway(self.endpoint, job='my_job', registry=self.registry)
time.sleep(5) # Collect metrics every 5 seconds
# Example usage
if __name__ == '__main__':
# Add metrics
collector.add_metric('metric1')
collector.add_metric('metric2')
# Start collecting and pushing metrics
collector.collect_metrics()