"""
Quick example on how to batch and unbatch parts for process
Programmer: Michael R. Gibbs
12/7/2023
"""
import simpy
import random
# set the compacity of each resource
WS1_cnt=2
WS2_cnt=2
WS3_cnt=3
# list of prod types
prod_list = ["P1", "p2", "P3"]
class Job():
"""
quick class to track jobs
"""
next_id = 1
def __init__(self, prod_type):
self.prod_type = prod_type
self.id = self.__class__.next_id
self.__class__.next_id +=1
# Wrap steps in a class to bundle parameters
class Batch_step():
"""
batches jobs based on their product type
When a batch for a product type gets to batch_size,
the batch is processed.
After the batch is processed, each job in the batch is sent to the next step
next_step is a function that takes a job as its paramter
delay_func is function with no paramters that return the
number of time units to do the step
"""
def __init__(self, env, step_name, batch_size, ws, next_step, delay_func):
self.env = env
self.step_name = step_name
self.batch_size = batch_size
self.ws = ws
self.next_step = next_step
self.delay_func = delay_func
# init a empty list for each prod type to track batching
self.job_q = {k:list() for k in prod_list}
def do_step(self, job):
"""
Takes a job and adds it to a batch based on its product type
If the batch is >= batch_size, then the batch is processed
After processing each job in the batch is sent to the next step
"""
print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has arrived at {self.step_name}")
# add job to batch
b_list = self.job_q[job.prod_type]
b_list.append(job)
# check if batch is big enough to be processed
if len(b_list) >= self.batch_size:
# start next batch
self.job_q[job.prod_type] = list()
# seize and process
with self.ws.request() as req:
print(f"{self.env.now:.2f} jobs {[bjob.id for bjob in b_list]} of type {b_list[0].prod_type} has seized a resource {self.step_name}")
yield self.env.timeout(self.delay_func())
print(f"{self.env.now:.2f} jobs {[bjob.id for bjob in b_list]} of type {b_list[0].prod_type} has released a resource {self.step_name}")
for job in b_list:
# unpack batch and start next step
# note no yield here
self.env.process(self.next_step(job))
class Step():
"""
Take a job,
Seize resource,
Process job,
Release resource,
Send job to next step
"""
def __init__(self, env, step_name, ws, next_step, delay_func):
self.env = env
self.step_name = step_name
self.ws = ws
self.next_step = next_step
self.delay_func = delay_func
def do_step(self, job):
"""
Take a job,
Seize resource,
Process job,
Release resource,
Send job to next step
"""
print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has arrived at {self.step_name}")
with self.ws.request() as req:
print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has seized a resource {self.step_name}")
yield self.env.timeout(self.delay_func())
print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has released a resource {self.step_name}")
# start next step
if self.next_step is not None:
self.env.process(self.next_step(job))
def gen_jobs(env, prod_type, next_step, delay_func):
"""
Generate jobs of a product type and send
them to a step to be processed
"""
while True:
yield env.timeout(delay_func())
job = Job(prod_type)
# note no yield here
env.process(next_step(job))
def job_end_stub(env, job):
"""
Logs the end of a job
"""
yield env.timeout(0)
print(f"{env.now:.2f} job {job.id} of type {job.prod_type} is done")
# wraps the job end to look like a step (steps only have one parameter)
job_end = lambda job: job_end_stub(env, job)
# boot up
env = simpy.Environment()
# the resources
ws1 = simpy.Resource(env, WS1_cnt)
ws2 = simpy.Resource(env, WS2_cnt)
ws3 = simpy.Resource(env, WS3_cnt)
# create steps and chain them together
# note I use lambdas to create no paramter random delay functions
# this allows me to use any random fuction without changing do_step code
# also not how the next step is a paramter which makes chaning the process order
# or adding a process easier.
step_6 = Step(env, "step 6", ws3, job_end, lambda :random.triangular(1,3,2))
step_5 = Batch_step(env, "step 5", 3, ws1, step_6.do_step,lambda :random.triangular(1,3,2))
step_4 = Step(env, "step 4", ws2, step_5.do_step, lambda :random.triangular(1,3,2))
step_3 = Step(env, "step 3", ws3, step_4.do_step, lambda :random.triangular(1,3,2))
step_2 = Step(env, "step 2", ws2, step_4.do_step, lambda :random.triangular(1,3,2))
step_1 = Batch_step(env, "step 1", 3, ws1, step_4.do_step, lambda :random.triangular(1,3,2))
# start the job generators, one for each product type
for prod_type in prod_list:
env.process(gen_jobs(env, prod_type, step_1.do_step, lambda :random.triangular(1,3,2)))
env.run(100)
print("--- done ---")