Modified:
/trunk/ecspy/migrators.py
=======================================
--- /trunk/ecspy/migrators.py Thu Dec 10 19:12:27 2009
+++ /trunk/ecspy/migrators.py Sun Jan 8 22:24:35 2012
@@ -25,6 +25,9 @@
along with this program. If not, see
<http://www.gnu.org/licenses/>.
"""
+import multiprocessing
+import Queue
+
def default_migration(random, population, args):
"""Do nothing.
@@ -33,3 +36,59 @@
"""
return population
+
+
+class MultiprocessingMigrator(object):
+ """Migrate among processes on the same machine.
+
+ This callable class allows individuals to migrate from one process
+ to another on the same machine. It maintains a queue of migrants
+ whose maximum length can be fixed via the ``max_migrants``
+ parameter in the constructor. If the number of migrants in the queue
+ reaches this value, new migrants are not added until earlier ones
+ are consumed. The unreliability of a multiprocessing environment
+ makes it difficult to provide guarantees. However, migrants are
+ theoretically added and consumed at the same rate, so this value
+ should determine the "freshness" of individuals, where smaller
+ queue sizes provide more recency.
+
+ An optional keyword argument in ``args`` requires the migrant to be
+ evaluated by the current EC before being inserted into the population.
+ This can be important when different populations use different
+ evaluation functions and you need to be able to compare "apples with
+ apples".
+
+ Optional keyword arguments in args:
+
+ - *evaluate_migrant* -- should new migrants be evaluated before
+ adding them to the population (default: False)
+
+ """
+ def __init__(self, max_migrants=1):
+ self.max_migrants = max_migrants
+ self.migrants = multiprocessing.Queue(self.max_migrants)
+ self.lock = multiprocessing.Lock()
+ self.__name__ = self.__class__.__name__
+
+ def __call__(self, random, population, args):
+ with self.lock:
+ evaluate_migrant = args.setdefault('evaluate_migrant', False)
+ migrant_index = random.randint(0, len(population) - 1)
+ old_migrant = population[migrant_index]
+ try:
+ migrant = self.migrants.get(block=False)
+ if evaluate_migrant:
+ fit = args["_ec"].evaluator([migrant.candidate], args)
+ migrant.fitness = fit[0]
+ args["_ec"].num_evaluations += 1
+ population[migrant_index] = migrant
+ except Queue.Empty:
+ pass
+ try:
+ self.migrants.put(old_migrant, block=False)
+ except Queue.Full:
+ pass
+ return population
+
+
+