jobs = [ # task = (processing_time, machine_id)
# Utilisation Tunnel
[[(200, 1)]],
[[(400, 2)]],
# Job 1
[[(600, 0), (600, 1), (600, 2)],
[(120, 3), (180, 4), (240, 5)],
[(1350, 7)]],
# Job 2
[[(600, 0), (600, 1), (600, 2)],
[(779, 6)]],
# Job 3
[[(600, 0), (600, 1), (600, 2)],
[(1800, 3), (2160, 4), (2700, 5)],
[(864, 8)]],
# Job 4
[[(600, 0), (600, 1), (600, 2)],
[(554, 6)]]]
num_jobs = len(jobs)
all_jobs = range(num_jobs)
machines_count = 9
all_machines = range(machines_count)
model = cp_model.CpModel()
horizon = 0
for job in jobs:
for task in job:
max_task_duration = 0
for alternative in task:
max_task_duration = max(max_task_duration, alternative[0])
horizon += max_task_duration
# Global storage of variables.
intervals_per_resources = collections.defaultdict(list)
starts = {} # indexed by (job_id, task_id).
ends = {} # indexed by (job_id, task_id).
presences = {} # indexed by (job_id, task_id, alt_id).
job_ends = []
# Scan the jobs and create the relevant variables and intervals.
for job_id in all_jobs:
job = jobs[job_id]
num_tasks = len(job)
previous_end = None
for task_id in range(num_tasks):
task = job[task_id]
min_duration = task[0][0]
max_duration = task[0][0]
num_alternatives = len(task)
all_alternatives = range(num_alternatives)
for alt_id in range(1, num_alternatives):
alt_duration = task[alt_id][0]
min_duration = min(min_duration, alt_duration)
max_duration = max(max_duration, alt_duration)
# Create main interval for the task.
if len(jobs[job_id]) > 1:
suffix_name = "_j%i_t%i" % (job_id, task_id)
start = model.NewIntVar(0, horizon, "start" + suffix_name)
duration = model.NewIntVar(
min_duration, max_duration, "duration" + suffix_name
)
end = model.NewIntVar(0, horizon, "end" + suffix_name)
interval = model.NewIntervalVar(
start, duration, end, "interval" + suffix_name
)
# Create artificial tasks for tunnel (i.e. les seuls jobs n'ayant qu'une seule tâche sont les jobs artificiels)
else:
suffix_name = "_j%i_t%i_off" % (job_id, task_id)
start = 0
duration = task[0][0]
end = start + duration
interval = model.NewIntervalVar(start, duration, end, "interval" + suffix_name)
# Store the start and end for the solution.
starts[(job_id, task_id)] = start
ends[(job_id, task_id)] = end
# Add precedence with previous task in the same job.
if previous_end is not None:
model.Add(start >= previous_end)
# Add constraint on task 1 and 2 : no waiting time
if task_id == 1:
model.Add(start == previous_end)
previous_end = end
# Create alternative intervals.
if num_alternatives > 1:
l_presences = []
for alt_id in all_alternatives:
alt_suffix = "_j%i_t%i_a%i" % (job_id, task_id, alt_id)
l_presence = model.NewBoolVar("presence" + alt_suffix)
l_start = model.NewIntVar(0, horizon, "start" + alt_suffix)
l_duration = task[alt_id][0]
l_end = model.NewIntVar(0, horizon, "end" + alt_suffix)
l_interval = model.NewOptionalIntervalVar(
l_start, l_duration, l_end, l_presence, "interval" + alt_suffix
)
l_presences.append(l_presence)
# Link the primary/global variables with the local ones.
model.Add(start == l_start).OnlyEnforceIf(l_presence)
model.Add(duration == l_duration).OnlyEnforceIf(l_presence)
model.Add(end == l_end).OnlyEnforceIf(l_presence)
# Add the local interval to the right machine.
intervals_per_resources[task[alt_id][1]].append(l_interval)
# Store the presences for the solution.
presences[(job_id, task_id, alt_id)] = l_presence
# Select exactly one presence variable.
model.AddExactlyOne(l_presences)
else:
intervals_per_resources[task[0][1]].append(interval)
presences[(job_id, task_id, 0)] = model.NewConstant(1)
job_ends.append(previous_end)
# Create machines constraints.
for machine_id in all_machines:
intervals = intervals_per_resources[machine_id]
# print(intervals)
if len(intervals) > 1:
model.AddNoOverlap(intervals)
# Pas de temps d'attente pour le tunnel
no_idle_machine_range = range(3)
arcs = {i: [] for i in no_idle_machine_range}
for job_id in all_jobs:
for alt_id in all_alternatives:
job = jobs[job_id][0]
machine = job[alt_id][1]
start_lit = model.NewBoolVar("%i_%i is first job" % (job_id, alt_id))
arcs[machine].append((0, job_id + 1, start_lit))
min_start_time = 0
model.Add(starts[(job_id, 0)] == min_start_time).OnlyEnforceIf(start_lit)
end_lit = model.NewBoolVar("%i_%i is last job" % (job_id, alt_id))
arcs[machine].append((job_id + 1, 0, end_lit))
for job_id_2 in all_jobs:
if job_id == job_id_2:
continue
lit = model.NewBoolVar("%i follows %i_%i" % (job_id_2, job_id, alt_id))
arcs[machine].append((job_id + 1, job_id_2 + 1, lit))
model.Add(starts[(job_id_2, 0)] == ends[(job_id, 0)]).OnlyEnforceIf(lit)
for machine_id in no_idle_machine_range:
if len(arcs[machine]) > 1:
model.AddCircuit(arcs[machine])
# Makespan objective
makespan = model.NewIntVar(0, horizon, "makespan")
model.AddMaxEquality(makespan, job_ends)
model.Minimize(makespan)
# Solve model.
solver = cp_model.CpSolver()
solution_printer = SolutionPrinter()
solver.parameters.enumerate_all_solutions = False
solver.parameters.max_time_in_seconds = 300
status = solver.Solve(model, solution_printer)
print(solver.StatusName(status))