Batch formation

60 views
Skip to first unread message

ragu pathi

unread,
Dec 4, 2023, 1:31:26 AM12/4/23
to python-simpy
Dear all,

In my modelling, the semiconductor manufacturing system consists of three workstations: Diffusion workstation (WS1), Ion implantation workstation (WS2) and Lithography workstation (WS3). Each workstation consists of the following number of machines:
WS1=2, WS=2 and WS=3. This system produces 3 types of products. The process route consists of 6 processing steps as follows: Job creation - step 1 (it happens in WS1)- step 2 (WS2) - step 3 (WS3) - step 4 (WS2) -step 5 (WS1) -step 6 (WS3)- job dispose. 

WS1 is a batch processor (it starts processing whenever 3 jobs of the same processing step and the same product type arrive. Until batch formation, it has to wait. Once a batch is formed, that batch undergoes processing as a batch. After processing, the batch is unbatched and undergoes the other steps as a single job ). WS2 and WS3 are discrete processors (no need to form a batch).

please help me in forming the batch of jobs and unbatch them. Also, I want to trace the Work-In-Process of jobs at various workstations and the cycle time of the job.

Thank you. 

Michael Gibbs

unread,
Dec 4, 2023, 12:07:46 PM12/4/23
to python-simpy
sounds like you need a couple of dict's where the key is the product type and values are lists.  When a product arrives, put it in the right list and when the list gets up to length of 3, send the list to get processed by the workstation.  part of the workstation's post processing would be to pop the products out of the list and send them to the next processing step.

Michael Gibbs

unread,
Dec 7, 2023, 7:22:33 PM12/7/23
to python-simpy
Here is how I would do it.
I made two class templates for the two types of processing (batch and not batched) and then used the templates to generate each step.  One of the parameters for generating a step is the next step process so I can chain everything together.  

"""
Quick example on how to batch and unbatch parts for process

Programmer: Michael R. Gibbs
12/7/2023
"""

import simpy
import random

# set the compacity of each resource
WS1_cnt=2
WS2_cnt=2
WS3_cnt=3

# list of prod types
prod_list = ["P1", "p2", "P3"]

class Job():
    """
    quick class to track jobs
    """
    next_id = 1

    def __init__(self, prod_type):
        self.prod_type = prod_type

        self.id = self.__class__.next_id
        self.__class__.next_id +=1

# Wrap steps in a class to bundle parameters

class Batch_step():
    """
    batches jobs based on their product type
    When a batch for a product type gets to batch_size,
    the batch is processed.
    After the batch is processed, each job in the batch is sent to the next step

    next_step is a function that takes a job as its paramter

    delay_func is function with no paramters that return the
    number of time units to do the step
    """
   
    def __init__(self, env, step_name, batch_size, ws, next_step, delay_func):
        self.env = env
        self.step_name = step_name
        self.batch_size = batch_size
        self.ws = ws
        self.next_step = next_step
        self.delay_func = delay_func

        # init a empty list for each prod type to track batching
        self.job_q = {k:list() for k in prod_list}

    def do_step(self, job):
        """
        Takes a job and adds it to a batch based on its product type
        If the batch is >= batch_size, then the batch is processed
        After processing each job in the batch is sent to the next step
        """

        print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has arrived at {self.step_name}")

        # add job to batch
        b_list = self.job_q[job.prod_type]
        b_list.append(job)

        # check if batch is big enough to be processed
        if len(b_list) >= self.batch_size:
            # start next batch
            self.job_q[job.prod_type] = list()

            # seize and process
            with self.ws.request() as req:
                print(f"{self.env.now:.2f} jobs {[bjob.id for bjob in b_list]} of type {b_list[0].prod_type} has seized a resource {self.step_name}")
                yield self.env.timeout(self.delay_func())

            print(f"{self.env.now:.2f} jobs {[bjob.id for bjob in b_list]} of type {b_list[0].prod_type} has released a resource {self.step_name}")
           
            for job in b_list:
                # unpack batch and start next step
                # note no yield here
                self.env.process(self.next_step(job))

class Step():
    """
    Take a job,
    Seize resource,
    Process job,
    Release resource,
    Send job to next step
    """
   
    def __init__(self, env, step_name, ws, next_step, delay_func):
        self.env = env
        self.step_name = step_name
        self.ws = ws
        self.next_step = next_step
        self.delay_func = delay_func

    def do_step(self, job):
        """
        Take a job,
        Seize resource,
        Process job,
        Release resource,
        Send job to next step
        """

        print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has arrived at {self.step_name}")

        with self.ws.request() as req:
            print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has seized a resource {self.step_name}")
            yield self.env.timeout(self.delay_func())
       
        print(f"{self.env.now:.2f} job {job.id} of type {job.prod_type} has released a resource {self.step_name}")

        # start next step
        if self.next_step is not None:
            self.env.process(self.next_step(job))

def gen_jobs(env, prod_type, next_step, delay_func):
    """
    Generate jobs of a product type and send
    them to a step to be processed
    """
    while True:
        yield env.timeout(delay_func())

        job = Job(prod_type)

        # note no yield here
        env.process(next_step(job))

def job_end_stub(env, job):
    """
    Logs the end of a job
    """
    yield env.timeout(0)
    print(f"{env.now:.2f} job {job.id} of type {job.prod_type} is done")

# wraps the job end to look like a step (steps only have one parameter)
job_end = lambda job: job_end_stub(env, job)

# boot up
env = simpy.Environment()

# the resources
ws1 = simpy.Resource(env, WS1_cnt)
ws2 = simpy.Resource(env, WS2_cnt)
ws3 = simpy.Resource(env, WS3_cnt)

# create steps and chain them together
# note I use lambdas to create no paramter random delay functions
# this allows me to use any random fuction without changing do_step code
# also not how the next step is a paramter which makes chaning the process order
# or adding a process easier.
step_6 = Step(env, "step 6", ws3, job_end, lambda :random.triangular(1,3,2))
step_5 = Batch_step(env, "step 5", 3, ws1, step_6.do_step,lambda :random.triangular(1,3,2))
step_4 = Step(env, "step 4", ws2, step_5.do_step, lambda :random.triangular(1,3,2))
step_3 = Step(env, "step 3", ws3, step_4.do_step, lambda :random.triangular(1,3,2))
step_2 = Step(env, "step 2", ws2, step_4.do_step, lambda :random.triangular(1,3,2))
step_1 = Batch_step(env, "step 1", 3, ws1, step_4.do_step, lambda :random.triangular(1,3,2))

# start the job generators, one for each product type
for prod_type in prod_list:
    env.process(gen_jobs(env, prod_type, step_1.do_step, lambda :random.triangular(1,3,2)))

env.run(100)

print("--- done ---")


Reply all
Reply to author
Forward
0 new messages