Below and attached is my first cut at a Python multiprocessing module parallel implementation of the Differential Evolution genetic algorithm. The code is explicitly in the public domain.
While this does run in parallel processes, I do not see 100% CPU utilization for the child processes on my 4-core test server. I suspect this is due to locking on the multiprocessing.Array objects, where the entire array is being locked at each access rather than only the requested elements in the array.
Note the "coefficient polishing on" option's use of scipy.optimize.fmin at each new "best solution" during iteration of the genetic algorithm, this vastly speeds up the GA runs.
I did not cross-post to the numpy discussion list, not sure if I should.
James Phillips
zunzun AT
zunzun.com http://zunzun.com---------------------------------------
# testParallelDESolver.py
#
# Placed into the public domain 1 August, 2009
# James R. Phillips
# 2548 Vera Cruz Drive
# Birmingham, AL 35235 USA
# email:
zun...@zunzun.com#
http://zunzun.com
import ParallelDESolver, numpy, time
class TestSolver(ParallelDESolver.ParallelDESolver):
#some test data
xData = numpy.array([5.357, 9.861, 5.457, 5.936, 6.161, 6.731])
yData = numpy.array([0.376, 7.104, 0.489, 1.049, 1.327, 2.077])
def externalEnergyFunction(self, trial):
# inverse exponential with offset, y = a * exp(b/x) + c
predicted = trial[0] * numpy.exp(trial[1] / self.xData) + trial[2]
# sum of squared error
error = predicted - self.yData
return numpy.sum(error*error)
if __name__ == '__main__':
print
print ' Running test generating new random numbers and coefficient polisher off, estimated worst case time'
solver = TestSolver(4, 600, 600, -10, 10, "Rand2Exp", 0.7, 0.6, 0.01, False, False)
tStart = time.time()
atSolution, generations, bestEnergy, bestSolution = solver.Solve()
print ' ', time.time() - tStart, 'seconds elapsed. atSolution =', atSolution, ':', generations, 'generations passed, bestEnergy =', bestEnergy
print ' coefficients:',
for i in bestSolution:
print i,
print
print
print ' Running test generating new random numbers and coefficient polisher on, estimated medium case time'
solver = TestSolver(4, 600, 600, -10, 10, "Rand2Exp", 0.7, 0.6, 0.01, False, True)
tStart = time.time()
atSolution, generations, bestEnergy, bestSolution = solver.Solve()
print ' ', time.time() - tStart, 'seconds elapsed. atSolution =', atSolution, ':', generations, 'generations passed, bestEnergy =', bestEnergy
print ' coefficients:',
for i in bestSolution:
print i,
print
print
print ' Running test recycling random numbers and coefficient polisher on, estimated best case time'
solver = TestSolver(4, 600, 600, -10, 10, "Rand2Exp", 0.7, 0.6, 0.01, True, True)
tStart = time.time()
atSolution, generations, bestEnergy, bestSolution = solver.Solve()
print ' ', time.time() - tStart, 'seconds elapsed. atSolution =', atSolution, ':', generations, 'generations passed, bestEnergy =', bestEnergy
print ' coefficients:',
for i in bestSolution:
print i,
print
print
-------------------------------------------
# ParallelDESolver.py
#
# Placed into the public domain 1 August, 2009
# James R. Phillips
# 2548 Vera Cruz Drive
# Birmingham, AL 35235 USA
# email:
zun...@zunzun.com#
http://zunzun.com
import sys, os, numpy, random, scipy.optimize, multiprocessing
def parallelProcessPoolInitializer(in_sharedMemoryPopulation, in_sharedMemoryPopulationEnergies, in_sharedMemoryBestEnergy, in_sharedMemoryBestSolution, in_sharedMemoryAtSolution, in_sharedMemoryGeneration, in_solver):
global sharedMemoryPopulation
global sharedMemoryPopulationEnergies
global sharedMemoryBestEnergy
global sharedMemoryBestSolution
global sharedMemoryAtSolution
global sharedMemoryGeneration
global solver
sharedMemoryPopulation = in_sharedMemoryPopulation
sharedMemoryPopulationEnergies = in_sharedMemoryPopulationEnergies
sharedMemoryBestEnergy = in_sharedMemoryBestEnergy
sharedMemoryBestSolution = in_sharedMemoryBestSolution
sharedMemoryAtSolution = in_sharedMemoryAtSolution
sharedMemoryGeneration = in_sharedMemoryGeneration
solver = in_solver
def parallelProcessFunction(candidate):
global sharedMemoryPopulation
global sharedMemoryPopulationEnergies
global sharedMemoryBestEnergy
global sharedMemoryBestSolution
global sharedMemoryAtSolution
global sharedMemoryGeneration
global solver
if sharedMemoryAtSolution.value != 0: # has some other process found a solution?
return
eval('solver.' + solver.deStrategy + '(candidate)')
trialEnergy, atSolution = solver.EnergyFunction(solver.trialSolution)
if solver.polishTheBestTrials == True and trialEnergy < sharedMemoryBestEnergy.value and sharedMemoryGeneration.value > 0: # not the first generation
# try to polish these new coefficients a bit.
solver.trialSolution = scipy.optimize.fmin(solver.externalEnergyFunction, solver.trialSolution, disp = 0) # don't print warning messages to stdout
trialEnergy, atSolution = solver.EnergyFunction(solver.trialSolution) # recalc with polished coefficients
if trialEnergy < sharedMemoryPopulationEnergies[candidate]: # setting two shared memory objects, need to prevent contention here
sharedMemoryPopulationEnergies[candidate] = trialEnergy
for i in range(len(solver.trialSolution)):
sharedMemoryPopulation[candidate * solver.parameterCount + i] = solver.trialSolution[i]
# If at an all-time low, save to "best"
if trialEnergy < sharedMemoryBestEnergy.value: # setting two shared memory objects, need to prevent contention here
#print 'New best energy in generation', sharedMemoryGeneration.value, 'os.getpid() =', os.getpid(), 'new best energy', trialEnergy
sharedMemoryBestEnergy.value = trialEnergy
for i in range(len(solver.trialSolution)):
sharedMemoryBestSolution[i] = solver.trialSolution[i]
# if we've reached a sufficient solution, let the other processes know
if atSolution == True:
#print os.getpid(), 'At solution.'
sharedMemoryAtSolution.value = 1
class ParallelDESolver:
def __init__(self, parameterCount, populationSize, maxGenerations, minInitialValue, maxInitialValue, deStrategy, diffScale, crossoverProb, cutoffEnergy, useClassRandomNumberMethods, polishTheBestTrials):
self.polishTheBestTrials = polishTheBestTrials # see the Solve method where this flag is used
self.maxGenerations = maxGenerations
self.parameterCount = parameterCount
self.populationSize = populationSize
self.cutoffEnergy = cutoffEnergy
self.minInitialValue = minInitialValue
self.maxInitialValue = maxInitialValue
self.deStrategy = deStrategy # deStrategy is the name of the DE function to use
self.useClassRandomNumberMethods = useClassRandomNumberMethods
self.scale = diffScale
self.crossOverProbability = crossoverProb
self.generation = 1 # for testing, actual value set on the Solve() method
if True == self.useClassRandomNumberMethods:
self.SetupClassRandomNumberMethods(3)
def Solve(self):
# a random initial population in shared memory
sharedMemoryPopulation = multiprocessing.Array('d', [0.0] * (self.populationSize * self.parameterCount))
for i in range(self.populationSize * self.parameterCount):
sharedMemoryPopulation[i] = random.uniform(self.minInitialValue, self.maxInitialValue)
# initial population assumed to have high energies
sharedMemoryPopulationEnergies = multiprocessing.Array('d', [1.0E300] * self.populationSize)
# a shared memory "best energy" for internal comparisons
sharedMemoryBestEnergy = multiprocessing.Value('d', 1.0E300)
# best solution, will be copied into by worker processes
sharedMemoryBestSolution = multiprocessing.Array('d', [0.0] * self.parameterCount)
# if solution has been reached, we are done
sharedMemoryAtSolution = multiprocessing.Value('i', 0)
# allows check for "generation 0"
sharedMemoryGeneration = multiprocessing.Value('i', 0)
initialArguments = [sharedMemoryPopulation, sharedMemoryPopulationEnergies, sharedMemoryBestEnergy, sharedMemoryBestSolution, sharedMemoryAtSolution, sharedMemoryGeneration, self]
processPool = multiprocessing.Pool(initializer=parallelProcessPoolInitializer, initargs=initialArguments)
for sharedMemoryGeneration.value in range(self.maxGenerations):
#print 'new gen'
if sharedMemoryAtSolution.value != 0:
break
processPool.map(parallelProcessFunction, range(self.populationSize))
return sharedMemoryAtSolution.value, sharedMemoryGeneration.value, sharedMemoryBestEnergy.value, sharedMemoryBestSolution
def SetupClassRandomNumberMethods(self, in_RandomSeed):
#print 'Generating random numbers for recycling'
numpy.random.seed(in_RandomSeed) # this yields same results each time Solve() is run
self.nonStandardRandomCount = self.populationSize * self.parameterCount * 3 + 1 # the "+1" makes an odd number of array items
if self.nonStandardRandomCount < 523: # set a minimum number of random numbers
self.nonStandardRandomCount = 523
self.ArrayOfRandomIntegersBetweenZeroAndParameterCount = numpy.random.random_integers(0, self.parameterCount-1, size=(self.nonStandardRandomCount))
self.ArrayOfRandomRandomFloatBetweenZeroAndOne = numpy.random.uniform(size=(self.nonStandardRandomCount))
self.ArrayOfRandomIntegersBetweenZeroAndPopulationSize = numpy.random.random_integers(0, self.populationSize-1, size=(self.nonStandardRandomCount))
self.randCounter1 = 0
self.randCounter2 = 0
self.randCounter3 = 0
def GetClassRandomIntegerBetweenZeroAndParameterCount(self):
self.randCounter1 += 1
if self.randCounter1 >= self.nonStandardRandomCount:
self.randCounter1 = 0
return self.ArrayOfRandomIntegersBetweenZeroAndParameterCount[self.randCounter1]
def GetClassRandomFloatBetweenZeroAndOne(self):
self.randCounter2 += 1
if self.randCounter2 >= self.nonStandardRandomCount:
self.randCounter2 = 0
return self.ArrayOfRandomRandomFloatBetweenZeroAndOne[self.randCounter2]
def GetClassRandomIntegerBetweenZeroAndPopulationSize(self):
self.randCounter3 += 1
if self.randCounter3 >= self.nonStandardRandomCount:
self.randCounter3 = 0
return self.ArrayOfRandomIntegersBetweenZeroAndPopulationSize[self.randCounter3]
# this class might normally be subclassed and this method overridden, or the
# externalEnergyFunction set and this method used directly
def EnergyFunction(self, trial):
try:
energy = self.externalEnergyFunction(trial)
except ArithmeticError:
energy = 1.0E300 # high energies for arithmetic exceptions
except FloatingPointError:
energy = 1.0E300 # high energies for floating point exceptions
# we will be "done" if the energy is less than or equal to the cutoff energy
if energy <= self.cutoffEnergy:
return energy, True
else:
return energy, False
def Best1Exp(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,0,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = self.bestSolution[n] + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] - sharedMemoryPopulation[r2 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Rand1Exp(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = sharedMemoryPopulation[r1 * self.parameterCount + n] + self.scale * (sharedMemoryPopulation[r2 * self.parameterCount + n] - sharedMemoryPopulation[r3 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def RandToBest1Exp(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,0,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] += self.scale * (self.bestSolution[n] - self.trialSolution[n]) + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] - sharedMemoryPopulation[r2 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Best2Exp(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,1,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = self.bestSolution[n] + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] + sharedMemoryPopulation[r2 * self.parameterCount + n] - sharedMemoryPopulation[r3 * self.parameterCount + n] - sharedMemoryPopulation[r4 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Rand2Exp(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,1,1)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = sharedMemoryPopulation[r1 * self.parameterCount + n] + self.scale * (sharedMemoryPopulation[r2 * self.parameterCount + n] + sharedMemoryPopulation[r3 * self.parameterCount + n] - sharedMemoryPopulation[r4 * self.parameterCount + n] - sharedMemoryPopulation[r5 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Best1Bin(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,0,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = self.bestSolution[n] + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] - sharedMemoryPopulation[r2 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Rand1Bin(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = sharedMemoryPopulation[r1 * self.parameterCount + n] + self.scale * (sharedMemoryPopulation[r2 * self.parameterCount + n] - sharedMemoryPopulation[r3 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def RandToBest1Bin(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,0,0,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] += self.scale * (self.bestSolution[n] - self.trialSolution[n]) + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] - sharedMemoryPopulation[r2 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Best2Bin(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,1,0)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = self.bestSolution[n] + self.scale * (sharedMemoryPopulation[r1 * self.parameterCount + n] + sharedMemoryPopulation[r2 * self.parameterCount + n] - sharedMemoryPopulation[r3 * self.parameterCount + n] - sharedMemoryPopulation[r4 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def Rand2Bin(self, candidate):
r1,r2,r3,r4,r5 = self.SelectSamples(candidate, 1,1,1,1,1)
if True == self.useClassRandomNumberMethods:
n = self.GetClassRandomIntegerBetweenZeroAndParameterCount()
else:
n = random.randint(0, self.parameterCount-1)
self.trialSolution = sharedMemoryPopulation[candidate * self.parameterCount : candidate * self.parameterCount + self.parameterCount]
i = 0
while(1):
if True == self.useClassRandomNumberMethods:
k = self.GetClassRandomFloatBetweenZeroAndOne()
else:
k = random.uniform(0.0, 1.0)
if k >= self.crossOverProbability or i == self.parameterCount:
break
self.trialSolution[n] = sharedMemoryPopulation[r1 * self.parameterCount + n] + self.scale * (sharedMemoryPopulation[r2 * self.parameterCount + n] + sharedMemoryPopulation[r3 * self.parameterCount + n] - sharedMemoryPopulation[r4 * self.parameterCount + n] - sharedMemoryPopulation[r5 * self.parameterCount + n])
n = (n + 1) % self.parameterCount
i += 1
def SelectSamples(self, candidate, r1, r2, r3, r4, r5):
if r1:
while(1):
if True == self.useClassRandomNumberMethods:
r1 = self.GetClassRandomIntegerBetweenZeroAndPopulationSize()
else:
r1 = random.randint(0, self.populationSize-1)
if r1 != candidate:
break
if r2:
while(1):
if True == self.useClassRandomNumberMethods:
r2 = self.GetClassRandomIntegerBetweenZeroAndPopulationSize()
else:
r2 = random.randint(0, self.populationSize-1)
if r2 != candidate and r2 != r1:
break
if r3:
while(1):
if True == self.useClassRandomNumberMethods:
r3 = self.GetClassRandomIntegerBetweenZeroAndPopulationSize()
else:
r3 = random.randint(0, self.populationSize-1)
if r3 != candidate and r3 != r1 and r3 != r2:
break
if r4:
while(1):
if True == self.useClassRandomNumberMethods:
r4 = self.GetClassRandomIntegerBetweenZeroAndPopulationSize()
else:
r4 = random.randint(0, self.populationSize-1)
if r4 != candidate and r4 != r1 and r4 != r2 and r4 != r3:
break
if r5:
while(1):
if True == self.useClassRandomNumberMethods:
r5 = self.GetClassRandomIntegerBetweenZeroAndPopulationSize()
else:
r5 = random.randint(0, self.populationSize-1)
if r5 != candidate and r5 != r1 and r5 != r2 and r5 != r3 and r5 != r4:
break
return r1, r2, r3, r4, r5