"""
Basic production line simulation with some bells and whistles
Machines have break downs
Upstream process routes to next step
sort of uses a publish/subscribe model to route work
using blocking queues (modeled with simpy.Store)
Collects lots of statistics
Based on simpy machine example
Instead of having one big control process with a
huge end to end processing decision tree, each processing step
has its own process object will logic just for that process.
This makes adding new process and updates much more managable
especially for a large complex simulation
Programmer Michael R Gibbs
"""
import random
import simpy
RANDOM_SEED = 42
PT_MEAN = 10.0 # Avg. processing time in minutes
PT_SIGMA = 2.0 # Sigma of processing time
MTTF = 300.0 # Mean time to failure in minutes
BREAK_MEAN = 1 / MTTF # Param. for expovariate distribution
REPAIR_TIME = 30.0 # Time it takes to repair a machine in minutes
JOB_DURATION = 30.0 # Duration of other jobs in minutes
POOL_1_MACHINES = 10 # Number of machines in the machine resource pool 1
POOL_2_MACHINES = 2 # Number of machines in the machine resource pool 2
WEEKS = 4 # Simulation time in weeks
SIM_TIME = WEEKS * 7 * 24 * 60 # Simulation time in minutes
def time_per_part():
"""Return actual processing time for a concrete part."""
return random.normalvariate(PT_MEAN, PT_SIGMA)
def time_to_failure():
"""Return time until next failure for a machine."""
return random.expovariate(BREAK_MEAN)
class Machine(object):
"""A machine produces parts and my get broken every now and then.
If it breaks, it requests a *repairman* and continues the production
after the it is repaired.
A machine has a *name* and a numberof *parts_made* thus far.
"""
def __init__(self, env, name, repairman):
self.env = env
self.broken = False
self.repairman = repairman
# trach by step
self.part_cnt = {}
self.part_time = {}
self.breakdown_cnt = 0
self.breakdown_time = 0
# Start "working" and "break_machine" processes for this machine.
self.process = None
env.process(self.break_machine())
def working(self, step_name):
"""
Do a step to produce a part.
While making a part, the machine may break multiple times.
Request a repairman when this happens.
"""
# need this wrapper to store the process to be interupted
self.process = self.env.process(self._work(step_name))
yield self.process
self.process = None
def _work(self, step_name):
"""
this is where the real work is done
"""
# Start the processing step
proc_time = time_per_part()
done_in = proc_time
while done_in:
try:
# Working on the part
start = self.env.now
yield self.env.timeout(done_in)
done_in = 0 # Set to 0 to exit while loop.
except simpy.Interrupt:
self.broken = True
done_in -= self.env.now - start # How much time left?
# Request a repairman. This will preempt its "other_job".
with self.repairman.request(priority=1) as req:
yield req
yield self.env.timeout(REPAIR_TIME)
self.breakdown_cnt += 1
self.breakdown_time += REPAIR_TIME
self.broken = False
# Part is done.
self.part_cnt[step_name] = self.part_cnt.get(step_name, 0) + 1
self.part_time[step_name] = self.part_time.get(step_name, 0) + proc_time
def break_machine(self):
"""Break the machine every now and then."""
while True:
yield self.env.timeout(time_to_failure())
if (not self.broken) and (self.process is not None):
# Only break the machine if it is currently working.
self.process.interrupt()
class Product():
"""
Quick class to rep the product
can add more stat collection logic
"""
next_id = 1
def __init__(self):
self.next_id += 1
class Process_Start():
"""
Start process that is first step in making a part
Tries to keep all machines in resouce pool busy
The resource pool is shared with another process
Uses queues to route part to next process in a async way
"""
# collect stats for process
part_cnt = 0 # number of parts made
m_wait = 0 # wait for a machine
p_time = 0 # processing time
q_wait = 0 # wait when next queue is full
def __init__(self, env, m_pool, p2_queue, p3_queue, step_name):
self.env = env
self.m_pool = m_pool
self.p2_queue = p2_queue
self.p3_queue = p3_queue
self.step_name = step_name
# strat creating parts
self.env.process(self.work_start())
def work_start(self):
"""
Grabs machines as they become available
and start making a part
Note that the work process is kicked off async
and does not wait for it to finish (no yield)
before it tryies to grab the next machine
"""
while True:
# start making a part whenever a machine becomes available
start = self.env.now
machine = yield self.m_pool.get()
self.m_wait += env.now - start
self.env.process(self._do_work(machine))
def _do_work(self, machine):
# do processing and create a port
start = self.env.now
yield self.env.process(machine.working(self.step_name))
self.m_pool.put(machine)
self.p_time += self.env.now - start
product = Product()
self.part_cnt += 1
# route part to next machine
# blocks if next queue is full
# could add logic to look ahead so if one
# next process queue is full, make part for
# the other next process
start = env.now
next_p = random.randint(2,3)
if next_p == 2:
yield self.p2_queue.put(product)
else:
yield self.p3_queue.put(product)
self.q_wait += self.env.now - start
class Process_Next():
"""
Start process that is next step in making a part
Starts when upstream process puts a part in this process's queue
once part arrives still need to get a machine from resouce pool
note that the resource pool may be shared by other processes
"""
# collect stats for process
part_cnt = 0 # number of parts made
m_wait = 0 # wait for a machine
p_time = 0 # processing time
q_wait = 0 # wait time when input queue is empty
def __init__(self, env, m_pool, my_queue, step_name):
self.env = env
self.m_pool = m_pool
self.my_queue = my_queue
self.step_name = step_name
# start processing parts routed to process
self.env.process(self.work_start())
def work_start(self):
"""
wait for a part to show up in queue and start process
Note that the work process is kicked off async
and does not wait for it to finish (no yield)
before it tryies to grab the next part from this process's queue
"""
while True:
# start making a part whenever we have both a part and a machine
start = self.env.now
part = yield self.my_queue.get()
self.q_wait += env.now - start
# get a machine
start = self.env.now
machine = yield self.m_pool.get()
self.m_wait += self.env.now - start
# do process, do not wait before getting next part (no yield)
self.env.process(self._do_work(part, machine))
def _do_work(self, part, machine):
# do processing and work on a port
start = self.env.now
yield self.env.process(machine.working(self.step_name))
self.m_pool.put(machine)
self.p_time += self.env.now - start
self.part_cnt += 1
# part is done
# if there are more steps
# see Process_start on how to route
def other_jobs(env, repairman):
"""The repairman's other (unimportant) job."""
while True:
# Start a new job
done_in = JOB_DURATION
while done_in:
# Retry the job until it is done.
# It's priority is lower than that of machine repairs.
with repairman.request(priority=2) as req:
yield req
try:
start = env.now
yield env.timeout(done_in)
done_in = 0
except simpy.Interrupt:
done_in -= env.now - start
# Setup and start the simulation
print('Machine shop')
random.seed(RANDOM_SEED) # This helps reproducing the results
# Create an environment and start the setup process
env = simpy.Environment()
repairman = simpy.PreemptiveResource(env, capacity=1)
# build the machine resource pools
resource_pool_1 = store = simpy.Store(env)
resource_pool_1.items = [Machine(env, 'Machine %d' % (i + 1), repairman)
for i in range(POOL_1_MACHINES)]
resource_pool_2 = store = simpy.Store(env)
resource_pool_2.items = [Machine(env, 'Machine %d' % (i + 1), repairman)
for i in range(POOL_1_MACHINES, POOL_1_MACHINES + POOL_2_MACHINES)]
# master list of machines
machines = resource_pool_1.items + resource_pool_2.items
# queues for routing work
step_2_q = simpy.Store(env, capacity=10)
step_3_q = simpy.Store(env, capacity=10)
# processes auto start when created
process_step_1 = Process_Start(env, resource_pool_1, step_2_q, step_3_q, 'step 1')
process_step_2 = Process_Next(env, resource_pool_1, step_2_q, 'step 2')
process_step_3 = Process_Next(env, resource_pool_2, step_3_q, 'step 3')
env.process(other_jobs(env, repairman))
# Execute!
env.run(until=SIM_TIME)
# Analyis/results
print('Machine shop results after %s weeks' % WEEKS)
print('Machiness')
for machine in machines:
for process_step in machine.part_cnt:
print(f' step: {process_step}')
print(f' parts: count: {machine.part_cnt[process_step]} time:{machine.part_time[process_step]}')
print(f' breakdown: count: {machine.breakdown_cnt} time:{machine.breakdown_time}')
print()
print('Processes')
for p in [process_step_1, process_step_2, process_step_3]:
print(f'Process: {p.step_name}')
print(f' parts: {p.part_cnt} , processing time: {p.p_time:.2f} , machine wait: {p.m_wait:.2f} , queue wait: {p.q_wait:.2f}')