Multiprocessing in Kivy causes infinite 100% cpu usage

360 views
Skip to first unread message

Stuart Bradley

unread,
Dec 3, 2013, 5:22:10 PM12/3/13
to kivy-...@googlegroups.com

I'm using the following multicore script:

import networkx as nx
import time
import multiprocessing as mp

    def work(item):
        pths = []
        for path in nx.all_simple_paths(G, item, endNode,cutoff=maxLen-1):
            path.insert(0,startNode)
            pths.append(path)
        return pths

    def init_worker(Gr, st,ed, cl):
        global G, startNode, endNode, maxLen

        print "process initializing", mp.current_process()
        G, startNode, endNode, maxLen = Gr, st, ed, cl
        G.remove_node(startNode)

    def multi_core(graph, sourceChem, sinkChem, maxLen, minLen):
        paths = []
        startNeighbours = graph[sourceChem].keys()
        p = mp.Pool(initializer=init_worker, initargs=(graph,sourceChem, sinkChem, 9), processes=4)
        paths = p.map(work, startNeighbours, chunksize=(len(startNeighbours)/mp.cpu_count()))
        counter = 0
        for i in paths:
            if len(i) != 0:
                for j in i:
                    paths.append(j)
        p.close()
        p.join()
        return paths

Inside a Kivy GUI script to do the heavy lifting. However, whenever I run it the cores boot up fine but they run at 100% forever.

I've checked __name__ == "__main__" and my computer doesn't spawn infinite processes, it just seems to go crazy on however many cores I give it, without actually doing any work.

Is there a problem with my script? Or this a Kivy specific issue?

Edit: It appears Kivy pumps out the following output everytime a core is initialised:

Process initializing <Process(PoolWorker-3, started daemon)>
[INFO              ] Kivy v1.7.2
[INFO              ] [Logger      ] Record log in C:\Users\Stuart\.kivy\logs\kiv
y_13-12-04_137.txt
[INFO              ] [Factory     ] 144 symbols loaded
[DEBUG             ] [Cache       ] register <kv.lang> with limit=None, timeout=
Nones
[DEBUG             ] [Cache       ] register <kv.image> with limit=None, timeout
=60s
[DEBUG             ] [Cache       ] register <kv.atlas> with limit=None, timeout
=Nones
[INFO              ] [Image       ] Providers: img_tex, img_dds, img_pygame, img
_pil, img_gif
[DEBUG             ] [Cache       ] register <kv.texture> with limit=1000, timeo
ut=60s
[DEBUG             ] [Cache       ] register <kv.shader> with limit=1000, timeou
t=3600s
[INFO              ] [Text        ] Provider: pygame
[DEBUG             ] [Cache       ] register <kv.loader> with limit=500, timeout
=60s
[INFO              ] [Loader      ] using a thread pool of 2 workers
[DEBUG             ] [Cache       ] register <textinput.label> with limit=None,
timeout=60.0s
[DEBUG             ] [Cache       ] register <textinput.width> with limit=None,
timeout=60.0s
process initializing <Process(PoolWorker-4, started daemon)>

I had thought having the multicore script outside of Kivy would stop any erratic behaviour, but perhaps not. The program is set up as such:

Kivy main script <-> PathExplorer.py (backend) <-> multicorePath.py (multicore script).

Thoughts?

Cheers,

Stuart Bradley


Stuart Bradley

unread,
Dec 3, 2013, 8:41:33 PM12/3/13
to kivy-...@googlegroups.com
Somewhat of an update in progress. I've tested the multicore script outside of Kivy and it runs fine, so I suspect there is an issue with the two modules interfacing.

Kivy main.py:

"""
Kivy Tests
Stuart Bradley
19/11/2013


Building a front-end
is something I hate to do
but I do it for bio
"""



import kivy
kivy
.require('1.7.2')


from kivy.app import App
from kivy.uix.label import Label
from kivy.uix.gridlayout import GridLayout
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.textinput import TextInput
from kivy.uix.slider import Slider
from kivy.uix.checkbox import CheckBox
from kivy.config import Config
from kivy.uix.button import Button
from kivy.adapters.listadapter import ListAdapter
from kivy.uix.listview import ListItemButton, ListView
from kivy.uix.popup import Popup


from functools import partial
import networkx as nx
from PathExplorerGraph import PathGraph
from threading import Thread


class PathExplorer(GridLayout):
 
def __init__(self, **kwargs):
 
super(PathExplorer, self).__init__(**kwargs)
 
self.cols = 2


 
self.graph = PathGraph("RESTKegg.graphml")
 
 
self.source = TextInput(multiline=False)
 
self.list_adapter_Source = ListAdapter(data=[], cls=ListItemButton, selection_mode='single', allow_empty_selection=True)
 
self.list_adapter_Source.bind(on_selection_change=partial(self.on_Suggest_change, self.source))
 
self.sourceSuggest = ListView(adapter=self.list_adapter_Source)
 
self.source.bind(text=partial(self.on_text_change, self.list_adapter_Source, self.graph))
 
 
self.sink = TextInput(multiline=False)
 
self.list_adapter_Sink = ListAdapter(data=[], cls=ListItemButton, selection_mode='single', allow_empty_selection=True)
 
self.list_adapter_Sink.bind(on_selection_change=partial(self.on_Suggest_change, self.sink))
 
self.sinkSuggest = ListView(adapter=self.list_adapter_Sink)
 
self.sink.bind(text=partial(self.on_text_change, self.list_adapter_Sink, self.graph))
 
 
self.minLen = Label(text='Minimum Length:')
 
self.minimumLen = Slider(min=2, max=10, value=2)
 
self.minimumLen.bind(value=partial(self.on_minimumLen_change, self.minLen))
 
 
self.maxLen = Label(text='Maximum Length:')
 
self.maximumLen = Slider(min=2, max=10, value=10)
 
self.maximumLen.bind(value=partial(self.on_maximumLen_change, self.maxLen))
 
 
self.includeNeighbours = CheckBox()
 
 
self.buildNoExp = Button(text='Add to Graph')
 
self.buildNoExp.bind(state=partial(self.add_graph, self.source, self.sink, self.minimumLen, self.maximumLen, self.includeNeighbours, self.graph))
 
 
self.buildExp = Button(text='Add and Export Graph')
 
self.buildExp.bind(state=partial(self.add_graph, self.source, self.sink, self.minimumLen, self.maximumLen, self.includeNeighbours, self.graph))
 
 
self.add_widget(Label(text='Start Chemical:'))
 
self.add_widget(self.source)
 
self.add_widget(Label(text='Start Chemical Suggestions:'))
 
self.add_widget(self.sourceSuggest)
 
self.add_widget(Label(text='End Chemical:'))
 
self.add_widget(self.sink)
 
self.add_widget(Label(text='End Chemical Suggestions:'))
 
self.add_widget(self.sinkSuggest)
 
self.add_widget(self.minLen)
 
self.add_widget(self.minimumLen)
 
self.add_widget(self.maxLen)
 
self.add_widget(self.maximumLen)
 
self.add_widget(Label(text='Include Neighbouring Nodes:'))
 
self.add_widget(self.includeNeighbours)
 
self.add_widget(self.buildNoExp)
 
self.add_widget(self.buildExp)


 
def on_minimumLen_change(self, label, slider, value):
 label
.text = 'Maximum Length [Value: ' + str(int(value)) + ']:'


 
def on_maximumLen_change(self, label, slider, value):
 label
.text = 'Maximum Length [Value: ' + str(int(value)) + ']:'


 
def on_Suggest_change(self, textBox, item):
 
try:
 textBox
.text = item.selection[0].text
 
except IndexError:
 
return


 
def on_text_change(self, ListAdapter, graph, textBox, text):
 
if len(text) > 0:
 
ListAdapter.data = graph.listPopulator(text)
 
if graph.isInGraph(text):
 textBox
.background_color = [0, 255,0,1]
 
else:
 textBox
.background_color = [255, 0, 0, 1]
 
else:
 textBox
.background_color = [1, 1, 1, 1]


 
def add_graph(self, source, sink, minLen, maxLen, checkbox, graph, button, buttonState):
 
if buttonState is 'down':
 sourceChem
= graph.getID(source.text)
 sinkChem
= graph.getID(sink.text)


 
if (sinkChem == 'empty') or (sourceChem == 'empty') or (sinkChem == '') or (sourceChem == ''):
 
self.popup_display(self, 'Error', 'One of your chemicals is not in the database.')
 
return


 pausePopup
= Popup(content=Label(text='Please wait.'), title="Process running", auto_dismiss=False)
 pausePopup
.open()


 
ReturnStrings = ""
 
if button == self.buildNoExp:
 
ReturnStrings = graph.graphBuilder(sourceChem, sinkChem, minLen.value, maxLen.value, checkbox.active, True)
 
elif button == self.buildExp:
 
ReturnStrings = graph.graphBuilder(sourceChem, sinkChem, minLen.value, maxLen.value, checkbox.active, False)
 
 
if ReturnStrings != "":
 pausePopup
.dismiss()
 
if ReturnStrings.startswith("No"):
 
self.popup_display(self, 'Success', ReturnStrings)
 
return
 
elif ReturnStrings.startswith("'memorizedPaths'"):
 
self.popup_display(self, 'Error', ReturnStrings)
 
return
 
elif ReturnStrings.startswith("Export"):
 
self.popup_display(self, 'Success', ReturnStrings)
 
elif ReturnStrings.startswith("Addition"):
 
self.popup_display(self, 'Success', ReturnStrings)
 
 
def popup_display(self, widget, titleString, message):
 btnclose
= Button(text='Ok', size_hint_y=None, height=50)
 l
= Label(text=message, valign='top', padding_y=-10)
 l
.bind(size=lambda s, w: s.setter('text_size')(s, w))


 content
= BoxLayout(orientation='vertical')
 content
.add_widget(l)
 content
.add_widget(btnclose)
 popup
= Popup(content=content, title=titleString, size_hint=(None, None), size=(300, 300), auto_dismiss=False)
 btnclose
.bind(on_release=popup.dismiss)
 
 popup
.open()


class PathExplorerApp(App):
 
def build(self):
 
Config.set('graphics', 'width', '660')
 
Config.set('graphics', 'height', '330')
 
return PathExplorer()


if __name__ == '__main__':
 
PathExplorerApp().run()

PathExplorerGraph.py:

"""
PathExporter backend
Stuart Bradley
20/11/2013
"""



import networkx as nx
import re
import difflib
import os
import csv
import multiprocessing as mp
import multiSearch


class PathGraph:
 __slots__
= ['FullG', 'PathG', 'SingleG']
 
def __init__(self, graph):
 
self.FullG = nx.read_graphml(graph)
 
self.PathG = nx.read_graphml(graph)
 
self.SingleG = nx.Graph(self.FullG)
 
 
def listPopulator(self, text):
 matches
= []
 
if (re.match("[A-Z]\d+",text)) is not None:
 nodeNames
= nx.get_node_attributes(self.FullG,'displayName')
 possibles
= difflib.get_close_matches(text, nodeNames.keys(), 20, 0.1)
 
for i in possibles:
 matches
.append(i)
 
else:
 nodeNames
= nx.get_node_attributes(self.FullG,'displayName')
 possibles
= difflib.get_close_matches(text, nodeNames.values(), 20, 0.1)
 
for i in possibles:
 matches
.append(i)
 
return matches


 
def isInGraph(self, text):
 nodeNames
= nx.get_node_attributes(self.FullG,'displayName')
 
if (re.match("[A-Z]\d+",text)) is not None:
 
for name in nodeNames.keys():
 
if name == text:
 
return True
 
else:
 
for name in nodeNames.values():
 
if name == text:
 
return True
 
return False


 
def getID(self, text):
 
if len(text) == 0:
 
return ""
 nodeNames
= nx.get_node_attributes(self.FullG,'displayName')
 
for k,v in nodeNames.iteritems():
 
if v == text or k == text:
 
return str(k)
 
return "empty"


 
def pathSearch(self, hasSource, hasSink, sourceChem, sinkChem, minLength, maxLength):
    paths
= []


   
#base case
   
if hasSource == False and hasSink == False:
       
return paths
   
   
if hasSource == True and hasSink == True:
     
if maxLength < 8:
     
for item in nx.all_simple_paths(self.SingleG, sourceChem, sinkChem, cutoff=maxLength-1):
     
if len(item) >= minLength:
     paths
.append(item)
     
return paths
     
else:
     paths
= multiSearch.multi_core(self.SingleG, sourceChem, sinkChem, maxLength)
 
return paths
 
#If Outdegree is used, saves children
 
def createOutDegree(self, paths):
   
for path in paths:
       
for node in path:
           
for child in self.PathG.neighbors_iter(node):
               
self.PathG.node[child]['Keep'] = 'Yes'
               
self.PathG.node[child]['Source/Sink/Path'] = 'None'
               
self.edgeAttributeChanger(node, child, 'Keep', 'Yes')
               
self.edgeAttributeChanger(node, child, 'Source/Sink/Path', 'None')
   
return self.PathG


 
#Find nodes to Keep
 
def keepNodes(self, pathnumTot, paths):
    prev
= None
    pathnum
= pathnumTot
   
for path in paths:
        pathnum
+= 1
       
for node in path:
           
self.PathG.node[node]['Keep'] = 'Yes'
           
if prev is not None:
             
self.edgeAttributeChanger(prev, node, 'Keep', 'Yes')
             
self.edgeAttributeChanger(prev, node, 'In Path', ('Path: ' + str(pathnum)))
                prev
= node
           
else:
                prev
= node
        prev
= None
   
return self.PathG, pathnum
   
 
#Delete components lacking 'Keep' atrribute
 
def removeNonKeep(self):
   
for a,b,k in self.PathG.edges(keys=True):
       
if 'Keep' not in self.PathG.edge[a][b][k]:
           
self.PathG.remove_edge(a,b,key=k)
   
for node in self.PathG.nodes():
       
if 'Keep' not in self.PathG.node[node]:
           
self.PathG.remove_node(node)
   
return self.PathG
   
 
#Removes 'Keep' attribute from components
 
def removeKeepAttribute(self):
   
for a,b,k in self.PathG.edges(keys=True):
     
try:
     
del self.PathG.edge[a][b][k]['Keep']
     
except KeyError:
     
pass
   
for node in self.PathG.nodes():
     
try:
     
del self.PathG.node[node]['Keep']
     
except KeyError:
     
pass
   
return self.PathG


 
#Create source/sink/path attribute  
 
def pathAttributes(self, paths, sourceChem, sinkChem):
   
for path in paths:
       
for node in path:
           
if sourceChem == node:
               
self.PathG.node[node]['Source/Sink/Path'] = 'Source'
           
elif sinkChem == node:
               
self.PathG.node[node]['Source/Sink/Path'] = 'Sink'
           
else:
               
self.PathG.node[node]['Source/Sink/Path'] = 'Path'
   
return self.PathG  


 
def edgeAttributeChanger(self, fr, to, attribute, value):
 
for i in range(0,100):
 
try:
 
self.PathG.edge[fr][to][i][attribute] = value
 
except Exception as e:
 
break
 
for i in range(0,100):
 
try:
 
self.PathG.edge[to][fr][i][attribute] = value
 
except Exception as e:
 
break


 
def graphBuilder(self, sourceChem, sinkChem, minLength, maxLength, wantsOutDegree, singleFile):
 hasSource
= True
 hasSink
= True
 pathnumTot
= 0
 fileDict
= {}


 
if len(sourceChem) == 0:
 hasSource
= False
 
self.PathG.add_node("noSource", StringName ='noSource')
 
if len(sinkChem) == 0:
 hasSink
= False
 
self.PathG.add_node("noSink", StringName ='noSink')


 paths
= self.pathSearch(hasSource, hasSink, sourceChem, sinkChem, minLength, maxLength)


 
if isinstance(paths, basestring):
 
return paths


 
if len(paths) == 0:
 
return "No paths."


 
if singleFile == False:
 
if wantsOutDegree == True:
 
self.PathG = self.createOutDegree(paths)
 
self.PathG, pathnumTot = self.keepNodes(pathnumTot, paths)
       
 
self.PathG = self.removeNonKeep()
 
self.PathG = self.removeKeepAttribute()
 
self.PathG = self.pathAttributes(paths, sourceChem, sinkChem)
           
 nx
.write_graphml(self.PathG, str(self.PathG.node[sourceChem]['StringName']) +str(self.PathG.node[sinkChem]['StringName'])+".graphml")
       
 fileName
=  "A Graph file: '" + (str(self.PathG.node[sourceChem]['StringName']) + str(self.PathG.node[sinkChem]['StringName'])+".graphml") + "', has been created at: " + str(os.getcwd())
 numberOfPaths
= "Number of paths found: " + str(len(paths))
 graphName
= (str(self.PathG.node[sourceChem]['StringName']) + str(self.PathG.node[sinkChem]['StringName'])+".graphml")
 
self.PathG = self.FullG


 
return "Export: \n \n" + numberOfPaths + "\n \n" + fileName
 
else:
 
if wantsOutDegree == True:
 
self.PathG = self.createOutDegree(paths)
 
self.PathG, pathnumTot = self.keepNodes(pathnumTot, paths)
     
 
self.PathG = self.removeNonKeep()
 
self.PathG = self.removeKeepAttribute()
 
self.PathG = self.pathAttributes(paths, sourceChem, sinkChem)


 
return "Addition: \n \n" + "Number of paths found: " + str(len(paths))

Current multiSearch.py:

"""
Networkx 1.9.1 _all_simple_paths benchmark
Stuart Bradley
2/12/2013
"""



import networkx as nx
import time
import multiprocessing as mp


def work(item):
    pths
= []

   
for path in nx.all_simple_paths(G, item, end,cutoff=currLen-1):
    path
.insert(0,start)
    pths
.append(path)
   
return pths


def init_worker(Grp, st, ed, cl):
   
global start, end, currLen
   
global G


   
print "process initializing", mp.current_process()
    start
, end , currLen = st, ed, cl
    G
= Grp
    G
.remove_node(start)


def multi_core(graph, start, end, mlen):
    startNeighbours
= graph[start].keys()
    p
= mp.Pool(initializer=init_worker, initargs=(graph, start,end, mlen))

    paths
= p.map(work, startNeighbours, chunksize=(len(startNeighbours)/mp.cpu_count()))
    counter
= 0
   
for i in paths:
       
if len(i) != 0:
           
for j in i:

                counter
+=1
   
print counter
    p
.close()
    p
.join()
   
return paths



This is my first time with Kivy and up until now I havent had a problem, so hopefully someone with more experience can help!

Cheers,
Stuart Bradley

Stuart Bradley

unread,
Dec 4, 2013, 5:25:35 PM12/4/13
to kivy-...@googlegroups.com
If I run it in packaged mode on Win 8.1 64-bit I get an error instead of infinite CPU usage:

[ERROR             ] [Core        ] option --multiprocessing-fork not recognized


[INFO              ] [Base        ] Leaving application in progress...
 
Traceback (most recent call last):
   
File "<string>", line 157, in <module>
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ap
p"
, line 600, in run
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ba
se"
, line 454, in runTouchApp
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.co
re.window.window_pygame"
, line 325, in mainloop
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.co
re.window.window_pygame"
, line 231, in _mainloop
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ba
se"
, line 297, in idle
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ba
se"
, line 284, in dispatch_input
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ba
se"
, line 209, in post_dispatch_input
   
File "_event.pyx", line 285, in kivy._event.EventDispatcher.dispatch (kivy\_e
vent
.c:4203)
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.co
re.window"
, line 630, in on_motion
   
File "_event.pyx", line 285, in kivy._event.EventDispatcher.dispatch (kivy\_e
vent
.c:4203)
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.co
re.window"
, line 642, in on_touch_down
   
File "_event.pyx", line 285, in kivy._event.EventDispatcher.dispatch (kivy\_e
vent
.c:4203)
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ui
x.widget"
, line 266, in on_touch_down
   
File "_event.pyx", line 285, in kivy._event.EventDispatcher.dispatch (kivy\_e
vent
.c:4203)
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ui
x.button"
, line 123, in on_touch_down
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\kivy.ui
x.button"
, line 107, in _do_press
   
File "properties.pyx", line 321, in kivy.properties.Property.__set__ (kivy\pr
operties
.c:3471)
   
File "properties.pyx", line 353, in kivy.properties.Property.set (kivy\proper
ties
.c:3929)
   
File "properties.pyx", line 407, in kivy.properties.Property.dispatch (kivy\p
roperties
.c:4498)
   
File "<string>", line 122, in add_graph
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\PathExp
lorerGraph"
, line 193, in graphBuilder
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\PathExp
lorerGraph"
, line 90, in pathSearch
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multiSe
arch"
, line 29, in multi_core
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multipr
ocessing"
, line 232, in Pool
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multipr
ocessing.pool"
, line 136, in __init__
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multipr
ocessing.pool"
, line 199, in _repopulate_pool
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multipr
ocessing.process"
, line 130, in start
   
File "C:\Kivy\PyInstaller-2.1\Backtrack\build\Backtrack\out00-PYZ.pyz\multipr
ocessing.forking"
, line 274, in __init__
 
IOError: [Errno 22] Invalid argument

beyond

unread,
Aug 20, 2014, 1:16:51 PM8/20/14
to kivy-...@googlegroups.com
Hi, were you able to figure out the solution. I am hitting the same issue when using multiprocessing.  Thanks in advance!
Reply all
Reply to author
Forward
0 new messages