Batch processing

281 views
Skip to first unread message

Ruben Proano

unread,
Jan 11, 2022, 2:22:31 PM1/11/22
to python-simpy
Hi, 

Can someone suggest how to handle a batch processing system? Imagine you have set of jobs arriving randomly to be processed by a single server. The server does not process a job at a time following a FIFO policy. Instead,  the server waits until a there are enough jobs in the queue to simultaneously process a batch of them.

This is a situation happening with Covid test processing. Samples arrive at a processing facility, where they test a group of hundred samples at once. If the batch tests negative, another batch of hundred samples is processed, but if a batch test positive, each swap in the sample is processed individually. 

I have not found in simpy any method that allows a required  resource to decrease the queue by a batch size. It seems that a requested resource, if available, always decreases a queue by one unit.

Thank you in advance for your suggestions. 

Ruben

Andre...@gmx.net

unread,
Jan 11, 2022, 4:19:17 PM1/11/22
to python-simpy
Hi!

Instead of Resource try to use container instead. Other than that - a Resource is basically "a list" (I remember you can access it through `r.items`?) with some additional information and I think you can safely remove the items as a batch from this list.

Best of luck!

Michael Allen

unread,
Jan 11, 2022, 5:13:25 PM1/11/22
to Ruben Proano, python-simpy
Hi Ruben,

In case it is of interest - I did modelling (using SimPy) for one of the large 'Lighthouse' Covid screening labs in the UK as they were setting it up, to help plan people/machine resources. That has varying batch sizes throughout the process.

And you can run it by clicking on the 'launch Binder' button - though it is slow on Binder as only 1 CPU is available, and the code is written to run multiple trials across multiple CPU cores. Binder will also probably take some time to initialise as it will need to build a new virtual machine as the code hasn't been run for a long time.

Mike

--
You received this message because you are subscribed to the Google Groups "python-simpy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-simpy...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/python-simpy/b7c159c3-4a35-40a7-9601-b472c5123fc9n%40googlegroups.com.

Omogbai Oleghe

unread,
Jan 12, 2022, 6:30:13 AM1/12/22
to Michael Allen, Ruben Proano, python-simpy

Hi Ruben,

 

Below is a simple proof of concept, where 100 samples are treated as a single batch (single unit).

I would advise the use of OOP to create a more complex model whereby the batch can be further split to 100 individual units if found positive, and each unit will have its own processing time and take on other unique attributes.


import simpy
import numpy as np

total_in_append = []
total_out_append = []


def samples(env):
    while True:
        env.process(test_1(env))
        total_in_append.append(100)
        yield env.timeout(np.random.uniform(3, 6))
        samples_in_system = sum(total_in_append) - sum(total_out_append)
        entered_system = sum(total_in_append)
        exited_system = sum(total_out_append)
        print(entered_system, exited_system, samples_in_system)


def test_1(env):
    test_positive = np.random.choice(a=['Y', 'N'], p=[0.2, 0.8])
    testers_request = testers.request()
    yield testers_request
    if test_positive == 'N':
        yield env.timeout(5)
        total_out_append.append(100)
    else:
        yield env.timeout(80)
        total_in_append.append(100)
    testers.release(testers_request)


env = simpy.rt.RealtimeEnvironment(factor=0.02, strict=False)
testers = simpy.Resource(env, capacity=1)
env.process(samples(env))
env.run(until=2000)


Michael Gibbs

unread,
Jan 13, 2022, 11:58:17 PM1/13/22
to python-simpy
I modeled the tester as a process as oppose to a resource.  The basic flow is samples arrive and a "batcher" groups the samples into batches of 100 and puts the batch in a testing queue.  The tester then pulls from the testing queue and does the testing.  I can have more then one tester if I want, and can even have different types of testers.  For extra credit, I added a timeout on the batcher so if it is taking too long to get to 100 samples, it sends the short batch at the time out.  

"""
Simulates the batching of samples and then testing each batch

Programmer Michael R. Gibbs
"""

import simpy
import random

class Sample():
    """
    Simple sample to be tested

    id: unique sample id
    is_positive: will test positive (True,False), set when sample is created
    """

    next_id = 1 # next unque id for ample, class variable

    def __init__(self):
        """
        creates the sample, settiting if it positve for what we are looking for
        """

        self.id = Sample.next_id
        Sample.next_id += 1

        if random.random() < 0.005:
            self.is_positive = True
        else:
            self.is_positive = False

class Batcher():
    """
    Batches samples into batches of 100
    when batch is full, sends to testing queue
    also has a timeout if filling the batch takes too long, send a short batch
    """

    def __init__(self, env, test_queue):

        self.env = env

        self.test_queue = test_queue

        self.batch = []

        # starts the timeout timer
        self.timeout = self.env.process(self.batch_timeout())

    def batch_sample(self, sample):
        """
        add sample to batch, checks if batch is ready to be sent to test queue
        """

        print(f'{self.env.now} sample {sample.id} has arrived')
        self.batch.append(sample)

        if len(self.batch) >= 100:
            print(f'{self.env.now} batch has 100 samples')

            # since a new batch will be started, also restart the timeout timer
            self.timeout.interrupt()

            self.test_batch()
            self.timeout = self.env.process(self.batch_timeout())

    def batch_timeout(self):
        """
        send the batch to testing if it is taking too long to get 100 samples
        """
        try :
            yield self.env.timeout(99)
            print(f'{self.env.now} batch timeout')
            self.test_batch()
            self.timeout = self.env.process(self.batch_timeout())
        except:
            # got interupted, must of hit 100 samples before timer expired
            print(f'{self.env.now} batcher reset timout')

    def test_batch(self):
        """
        sends the batch to the test queue and start a new batch
        """

        print(f'{self.env.now} sending batch of {len(self.batch)} samples to be tested')
        self.test_queue.put(self.batch)
        self.batch = []


def process_test_queue(id, env, test_queue):
    """
    simulates getting a batch from testing queue and testing a composit sample from the batch
    If the composit sample is negative we are done
    If the composit sample is positive, test all samples
    """

    while True:

        batch = yield test_queue.get()
        print(f'{env.now} tester {id} has started testing a batch')

        # get positive count
        positive_cnt = 0
        for sample in batch:
            if sample.is_positive:
                positive_cnt += 1

        # sim testing the composit batch sample
        yield env.timeout(0.01)
        print(f'{env.now} tester {id} has finish testing composit batch sample')

        if positive_cnt > 0:
            # have at least one positive, sim the testing of all the samples
            print(f'{env.now} tester {id} batch has a positive, testing each sample')
            yield env.timeout(1)

        print(f'{env.now} tester {id} has finish testing, batch has {positive_cnt} positives')

def generate_samples(env, batcher):
    """
    generates all the samples an sends them to the batcher
    """

    while True:
        # randomised arrive times
        yield env.timeout(random.triangular(1.5, 1, .35))

        sample = Sample()
        batcher.batch_sample(sample)


# create and start the model
env = env = simpy.Environment()
test_queue = simpy.Store(env)
batcher = Batcher(env,test_queue)
env.process(generate_samples(env,batcher))
env.process(process_test_queue(1,env,test_queue))

# uncomment if you want to have a second tester
#env.process(process_test_queue(2,env,test_queue))

env.run(1000)


Ruben Proano

unread,
Jan 16, 2022, 7:51:55 PM1/16/22
to Michael Gibbs, python-simpy
Fantastic! Thank you very much, Michael.



You received this message because you are subscribed to a topic in the Google Groups "python-simpy" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/python-simpy/iFYaDlL4fq0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to python-simpy...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/python-simpy/314b7ad3-1425-4065-86fc-ae6c957cc44cn%40googlegroups.com.
Reply all
Reply to author
Forward
Message has been deleted
0 new messages