I want to include worker capacity in my model. What I mean by that is that I want to take the shifts of the employees and their respective qualifications into account.
Not every worker has the same qualifications, some can only operate a few machines, some all of them.
And of course no worker is there all the time, each week they work an assigned shift and there are also sick and vacation days.
For testing purposes, I use the simplified model below.
The real model is significantly more complex: e.g. some jobs have alternatives and could be produced on more than one machine; the duration of a job is not fixed and can vary, because the intervals span over break times;
deadlines; earliest possible start times; dependencies between different jobs; setup times that depend on the preceding task and so on.
This simplified model assigns exactly one worker per task. What is missing is the possibility to assign more than one worker per task. The workers should not work simultaneously on a task, but successively, so take turns, if needed.
For example task 7 is scheduled on machine 2 from 5 to 12. Then worker A is assigned from time 5 to 10 and worker B from time 11 to 12. (Maybe because worker As shift ends at time 10)
For each point in the duration of a task there always has to be assigned exactly on worker.
However it should be preferred that one worker operate the task from start to finish, or at least until his shift ends. There should be no switches to another worker if the first one is still available.
If all this could be implemented, I would also like it to be possible that one worker can operate up to 3 machines simultaneously. (Of course just machine which the worker is qualified to operate.)
The goal besides getting everything done in time, should be that as many machines as possible are running.
E.g. in the night shifts there are less workers, so it can be possible that some machines cannot be operated in that time, because there isn't enough worker capacity.
(Because I implemented varying durations and spanning over breaks, maybe I also have to somehow implement, that some task have to be "the time of a shift" longer, because no one can operate the machine the task in planned on.)
I'm stumped on how to do that. Any ideas or suggestions?
First I thought I add a third level to worker_for_job[(worker_id, job_id), so something like worker_for_job[(worker_id, job_id, time_t) for time_t in range(duration).
But because my durations vary and are not a constant (but also a IntVariable), I cannot do anything like that.
That's the code:
"""Jobshop with maintenance tasks using the CP-SAT solver."""
import collections
from typing import Sequence
from absl import app
from ortools.sat.python import cp_model
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.wall_time, self.objective_value)
)
self.__solution_count += 1
def jobshop_with_maintenance():
"""Solves a jobshop with maintenance on one machine."""
# Create the model.
model = cp_model.CpModel()
n_workers = 3
'''
jobs_data = [ # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)], # Job2
]
'''
jobs_data = [ # task = (machine_id, processing_time).
[(0, 3)], # Job0
[(0, 2)], # Job1
[(1, 4)], # Job2
[(1, 6)], # Job3
[(1, 5)], # Job4
[(2, 2)], # Job5
[(2, 2)], # Job6
[(2, 7)], # Job7
]
all_workers=range(n_workers)
all_jobs=range(len(jobs_data))
#boolean variable for each combination of worker and task/job.
# if True then worker works on this job
worker_for_job={}
for worker_id in all_workers:
for job_id in all_jobs:
worker_for_job[(worker_id,job_id)]=model.NewBoolVar(f'worker_{worker_id}_on_job_{job_id}')
#for each task, apply sum(tech_for_task)==1 to ensure one worker works on one job only
for job in all_jobs:
model.Add(sum([worker_for_job[(worker_id,job)] for worker_id in all_workers])==1)
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)
# Named tuple to store information about created variables.
task_type = collections.namedtuple("Task", "start end interval")
# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple(
"assigned_task_type", "start job index duration worker"
)
# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)
starts = {}
ends = {}
intervals = {}
for job_id, job in enumerate(jobs_data):
for entry in enumerate(job):
for worker_id in all_workers:
task_id, task = entry
machine = task[0]
duration = task[1]
suffix = "_job%i_task%i_worker%i" % (job_id, task_id, worker_id
start_var = model.new_int_var(0, horizon, "start" + suffix)
starts[job_id, task_id, worker_id] = start_var
end_var = model.new_int_var(0, horizon, "end" + suffix)
ends[job_id, task_id, worker_id] = end_var
interval_var = model.new_optional_interval_var(
start_var, duration, end_var, worker_for_job[(worker_id,job_id)] , "interval" + suffix
)
intervals[job_id, task_id, worker_id] = interval_var
all_tasks[job_id, task_id, worker_id] = task_type(
start=start_var, end=end_var, interval=interval_var
)
machine_to_intervals[machine].append(interval_var)
# Add maintenance interval (machine 0 is not available on time {4, 5, 6, 7}).
machine_to_intervals[0].append(model.new_interval_var(4, 4, 8, "weekend_0"))
# Create and add disjunctive constraints.
for machine in all_machines:
model.add_no_overlap(machine_to_intervals[machine])
# Precedences inside a job.
# (not really needed here, only having one task per job here)
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
for worker_id in all_workers:
model.add(
all_tasks[job_id, task_id + 1, worker_id].start >= all_tasks[job_id, task_id, worker_id].end
).OnlyEnforceIf(worker_for_job[(worker_id,job_id)])
#the tasks for each worker should not overlap
for worker_id in all_workers:
model.add_no_overlap([intervals[job_id, task_id, worker_id] for (task_id, task) in enumerate(job) for (job_id, job) in enumerate(jobs_data) ])
# worker no. 2 is not qualified to operate machine 2:
jobs_on_machine_2 = [job_id for (job_id, job) in enumerate(jobs_data) if job[0][0] == 2]
model.add(sum(worker_for_job[2, job_id] for job_id in jobs_on_machine_2) == 0)
# worker no. 1 is not available from time 3 to 5
intervals_w1 = [intervals[job_id, task_id, 1] for task_id in range(len(job)) for (job_id, job) in enumerate(jobs_data)]
intervals_w1.append(model.new_interval_var(3, 2, 5, "not_available_w1"))
model.add_no_overlap(intervals_w1)
# worker no. 2 is not available from time 5 to 9
intervals_w2 = [intervals[job_id, task_id, 2] for task_id in range(len(job)) for (job_id, job) in enumerate(jobs_data)]
intervals_w2.append(model.new_interval_var(5, 4, 9, "not_available_w2"))
model.add_no_overlap(intervals_w2)
# Makespan objective.
obj_var = model.new_int_var(0, horizon, "makespan")
model.add_max_equality(
obj_var,
[all_tasks[job_id, len(job) - 1, worker_id].end for (job_id, job) in enumerate(jobs_data) for worker_id in all_workers],
)
model.minimize(obj_var)
# Solve model.
solver = cp_model.CpSolver()
solution_printer = SolutionPrinter()
status = solver.solve(model, solution_printer)
# Output solution.
if status == cp_model.OPTIMAL:
# Create one list of assigned tasks per machine.
assigned_jobs = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
for worker_id in all_workers:
machine = task[0]
#print(solver.value(worker_for_job[(worker_id,job_id)]))
assigned_jobs[machine].append(
assigned_task_type(
start=solver.value(all_tasks[job_id, task_id, worker_id].start),
job=job_id,
index=task_id,
duration=task[1],
worker=worker_i
)
)
# Create per machine output lines.
print('\n')
for machine in all_machines:
print( "Machine " + str(machine) + ": ")
assigned_jobs[machine].sort()
job_print = ''
time_print = ''
for assigned_task in assigned_jobs[machine]:
if solver.value(worker_for_job[(assigned_task.worker,assigned_task.job)]) == 1:
name = "job_j%i_t%i_w%i" % (assigned_task.job, assigned_task.index, assigned_task.worker)
#print(name)
job_print += ' '
job_print += name
start = assigned_task.start
duration = assigned_task.duration
sol_tmp = "[%i,%i]" % (start, start + duration)
#print(sol_tmp)
time_print += ' '
time_print += sol_tmp
print(job_print)
print(time_print)
# Finally print the solution found.
print("\nOptimal Schedule Length: %i" % solver.objective_value)
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
'''
for k in starts.keys():
if solver.value(worker_for_job[k[2], k[0]]) == 1:
print(f'worker: {k[2]}, job: {k[0]}, task: {k[1]}')
print(solver.value(starts[k]))
print(solver.value(ends[k]))
'''
jobshop_with_maintenance()
And that the output I get at the moment:
Solution 0, time = 0.007418 s, objective = 17
Solution 1, time = 0.010563 s, objective = 16
Solution 2, time = 0.012950 s, objective = 15
Machine 0:
job_j1_t0_w2 job_j0_t0_w2
[0,2] [9,12]
Machine 1:
job_j3_t0_w0 job_j4_t0_w0 job_j2_t0_w0
[0,6] [6,11] [11,15]
Machine 2:
job_j5_t0_w1 job_j7_t0_w1 job_j6_t0_w1
[0,2] [5,12] [12,14]
Optimal Schedule Length: 15
Statistics
- conflicts : 22
- branches : 294
- wall time : 0.017204 s