Prometheus in Python Multiprocessing app . not using Flask/Django etc

1,142 views
Skip to first unread message

George -

unread,
Jan 22, 2020, 2:30:14 AM1/22/20
to Prometheus Users
Hi guys

Battling to get this working, some assistance would be appreciated.

I'm watching a directory, when files land I kick off independent threaded multiprocessing jobs to load in parallel the files, code below.

I'm using some counters and summary... but they not incrementing, assuming the problem is because the multi processing is happening in a memory space where they can't get access to my counter objects.

Neither FILE_GESTER_TIME nor FILE_GESTER_LINE_COUNT is incrementing when I hit the url:8000

Please advise how I need to change the code to make this work,

Thanks.

G

Enter code here...  
# ...
from prometheus_client import start_http_server, Summary, Counter
 
# Prometheus metrics
FILE_GESTER_TIME                    = Summary('BSA_file_gester_worker', 'Time spent loading a file')
FILE_GESTER_LINE_COUNT      = Counter('BSA_file_gester_line_count', 'Running counter of lines loaded')
 
@FILE_GESTER_TIME.time()
def worker(filename, config_params):        # threaded process to attack each database async
 
   # --- Do work --
 # Open Txt file
file_loc = join(receive_dir, filename)
f = open(file_loc, "r")
  if f.mode == 'r': 
    for line in f:
         # --- Do some more work ---  
         #
         #

         FILE_GESTER_LINE_COUNT.inc()
 
     # end for line in files
   # end if
# end worker
 
def main():
 
    start_http_server(8000)
 
 
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
 
    for file in files:
        result.append(pool.apply_async(worker, (file, config_Params )))
 
    pool.close()
    pool.join()     # Sleep here until all workers are done




Brian Candler

unread,
Jan 22, 2020, 4:22:52 AM1/22/20
to Prometheus Users
On Wednesday, 22 January 2020 07:30:14 UTC, George - wrote:
I'm using some counters and summary... but they not incrementing, assuming the problem is because the multi processing is happening in a memory space where they can't get access to my counter objects.


You are correct: multiprocessing.Pool forks separate child processes to do the work, and they cannot modify the memory space of the parent. Some alternatives:

- There is a ThreadPool you can use instead, depending on whether that's useful in your application (i.e. whether you're I/O-bound or CPU-bound; the python GIL means that threads aren't useful when you're CPU-bound)

- You can have each worker expose its own metrics (e.g. on a different port) and scrape them all - but your prometheus server would have to know how to discover the workers.

- You can communicate counters from your children back to the parent - but you'd have to code that yourself

- You can run statsd_exporter and have the workers send updates to statsd.  I'd say that's the simplest solution.

Brian Brazil

unread,
Jan 22, 2020, 4:29:24 AM1/22/20
to George -, Prometheus Users
On Wed, 22 Jan 2020 at 07:30, George - <geor...@gmail.com> wrote:
Hi guys

Battling to get this working, some assistance would be appreciated.

I'm watching a directory, when files land I kick off independent threaded multiprocessing jobs to load in parallel the files, code below.

I'm using some counters and summary... but they not incrementing, assuming the problem is because the multi processing is happening in a memory space where they can't get access to my counter objects.

Neither FILE_GESTER_TIME nor FILE_GESTER_LINE_COUNT is incrementing when I hit the url:8000

Please advise how I need to change the code to make this work,

Have you looked at the multiprocess support of the python client? It's designed for cases like these.

Brian
 
--
You received this message because you are subscribed to the Google Groups "Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to prometheus-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/prometheus-users/bb1b1937-9df6-4750-a443-1d4c4280a26a%40googlegroups.com.


--

George

unread,
Jan 22, 2020, 8:57:50 AM1/22/20
to Brian Brazil, Prometheus Users
Hi Brian

This is not directly a multiprocessing solution, this is a prometheus solution requirement as far as I can determine. except if someone has an example for me showing me wrong.

G
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

George

unread,
Jan 22, 2020, 9:00:36 AM1/22/20
to Brian Candler, Prometheus Users
hmm, lots of maybe's...

Would like to see what the prometheus users normally do, as this is really a prometheus specific requirement,
the first ... different ports is not workable, i can have a server startup x, unknown number of http servers to be scrapped, at random number based on a unknown number of work processes and then some how have prometheus tie it together, it's going to be very messy.

G

--
You received this message because you are subscribed to the Google Groups "Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to prometheus-use...@googlegroups.com.

Brian Candler

unread,
Jan 22, 2020, 3:26:59 PM1/22/20
to Prometheus Users
Brian B is right: I'd completely forgotten about the multiprocess support built into the python client.


It's used, for example, by Netbox.

George

unread,
Jan 23, 2020, 1:08:00 AM1/23/20
to Brian Candler, Prometheus Users
Hi Brian

Ok So i've seen this note/link before... I'll just call myself stupid, don't need someone else to confirm it, I've not been able to map whats there onto my example and get it working.

That app module, is that my worker module, or is that a module i leave as is in the code. I use summary and count variables with their build in functions, time and count as per Prometheus design, 

If anyone is willing to help me with a more complete example, decorate my code with how to map this to mine would really be appreciated.

G

--
You received this message because you are subscribed to the Google Groups "Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to prometheus-use...@googlegroups.com.

George -

unread,
Jan 25, 2020, 6:19:57 AM1/25/20
to Prometheus Users
busy trying to make this work, this example refers tog unicorn as a web server, is that to expose the metrics or is that related to thinking it's like a Flask app a app that has a web interface.

This is all back end data processing that I'm doing that I'm writing as as multiprocess parallel app. only reason for the web interface is the prometheus metric exposing.

the example talks about placing the child_exit in a config file and refers to prometheus_multiproc_dir that needs to be set, where ?

G

I've created the below code as a simple test harness to try and get this working.


Enter code here...

import logging, sys, time
import random

from datetime import datetime

from prometheus_client import start_http_server, Summary, Counter, multiprocess

import multiprocessing


# Prometheus metrics
FILE_GESTER_TIME = Summary('BSA_file_gester_eft_worker', 'Time spent loading eft files')
FILE_GESTER_LINE_COUNT = Counter('BSA_file_gester_eft_line_count', 'Running counter of eft lines loaded')


@FILE_GESTER_TIME.time()
def worker( random_count ):

counter = 0
while counter < random_count:

FILE_GESTER_LINE_COUNT.inc()

counter += 1
print(random_count, ' ', counter)

time.sleep(1/5)

return

# end worker


def main():

    start_time = datetime.now()
print('* Start Time :', start_time)
print('')

try:

# Start up the server to expose the metrics.
start_http_server(8010)

# Step 1: Init multiprocessing.Pool()
pool = multiprocessing.Pool(4)

x = 0
while x < 4:

# This works, this ends as a serial completing, worker one after the other
#worker(random.randint(10, 120))

# This does not increment counters, this is suppose to place the working each in his own work space, so that all 4 basically run in parrallel
pool.apply_async(worker, ( random.randint(10, 120), ))

x += 1

pool.close()
pool.join() # Sleep here until all workers are done

    except KeyboardInterrupt:
print('Shutting Down Unexpectedly')
sys.stdout.flush()
sys.exit(-1)


except Exception as ex:
print("Failed to create worker {err}".format(err=ex))

finally:

print('')
print('Shutting Down')
finish_time = datetime.now()
run_time = finish_time - start_time
print('* Total Run Time :', run_time)

# end main(


if __name__ == '__main__':

logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)

main()

# end __name__ == '__main__'








On Thursday, January 23, 2020 at 8:08:00 AM UTC+2, George - wrote:
Hi Brian

Ok So i've seen this note/link before... I'll just call myself stupid, don't need someone else to confirm it, I've not been able to map whats there onto my example and get it working.

That app module, is that my worker module, or is that a module i leave as is in the code. I use summary and count variables with their build in functions, time and count as per Prometheus design, 

If anyone is willing to help me with a more complete example, decorate my code with how to map this to mine would really be appreciated.

G

On Wed, Jan 22, 2020 at 10:27 PM Brian Candler <b.ca...@pobox.com> wrote:
Brian B is right: I'd completely forgotten about the multiprocess support built into the python client.


It's used, for example, by Netbox.

--
You received this message because you are subscribed to the Google Groups "Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to prometheus-users+unsubscribe@googlegroups.com.

George -

unread,
Feb 5, 2020, 5:36:33 AM2/5/20
to Prometheus Users

Answer to question.

Working...


Enter code here...

import logging, sys, time, os
import random, multiprocessing

from datetime import datetime

from prometheus_client import start_http_server, multiprocess, CollectorRegistry, Summary, Counter
#from prometheus_client import generate_latest, Gauge

os.environ["prometheus_multiproc_dir"] = "/tmp/prom"

# Prometheus metrics
FILE_GESTER_TIME            = Summary('BSA_file_gester_eft_worker', 'Time spent loading eft files & # loaded')
FILE_GESTER_LINE_COUNT      = Counter('BSA_file_gester_eft_line_count', 'Running counter of eft lines loaded')


@FILE_GESTER_TIME.time()
def worker( random_count ):

    counter = 0
    while counter < random_count:

        FILE_GESTER_LINE_COUNT.inc()

        counter += 1
        print(random_count, ' ', counter)

        time.sleep(1/5)

    return

# end worker


def main():

    start_time = datetime.now()
    print('* Start Time               :', start_time)
    print('')

    try:

        # Start up the server to expose the metrics.
        registry = CollectorRegistry()
        multiprocess.MultiProcessCollector(registry)
        start_http_server(8010, registry=registry)
        print(os.environ["prometheus_multiproc_dir"])

        # Step 1: Init multiprocessing.Pool()
        pool = multiprocessing.Pool(4)

        x = 0
        while x < 4:

            # This works, this ends as a serial completing, worker one after the other
            #worker(random.randint(10, 120))

            # This does not increment counters, this is suppose to place the working each in his own work space, so that all 4 basically run in parrallel
            pool.apply_async(worker, ( random.randint(10, 120), ))

            x += 1

        pool.close()
        pool.join()     # Sleep here until all workers are done

    except KeyboardInterrupt:
        print('Shutting Down Unexpectedly')
        sys.stdout.flush()
        sys.exit(-1)


    except Exception as ex:
        print("Failed to create worker {err}".format(err=ex))

    finally:

        print('')
        print('Shutting Down')
        finish_time = datetime.now()
        run_time    = finish_time - start_time
        print('* Total Run Time         :', run_time)

# end main(


if __name__ == '__main__':

    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
        )

    main()

# end __name__ == '__main__'

#### And thats it... :)
Reply all
Reply to author
Forward
0 new messages