def optimize(jobs_data, setup_times):
model = cp_model.CpModel()
machines_count = len(setup_times.keys())
all_machines = range(machines_count)
all_jobs = range(len(jobs_data))
# Computes horizon dynamically as the sum of all durations and setup times.
horizon = 0
for job in jobs_data:
max_job_duration = max(duration for _, duration in job)
horizon += max_job_duration
for machine in setup_times.keys():
for j1 in setup_times[machine].keys():
for j2 in setup_times[machine][j1].keys():
horizon += setup_times[machine][j1][j2]
presences = {}
# Creates job intervals and add to the corresponding machine lists.
starts = {} # indexed by job_id
ends = []
machine_to_intervals = collections.defaultdict(list)
for job_id in all_jobs:
job = jobs_data[job_id]
min_duration = min(duration for _, duration in job)
max_duration = max(duration for _, duration in job)
num_configs = len(job)
suffix = "_%i" % job_id
start_var = model.NewIntVar(0, horizon, "start" + suffix)
end_var = model.NewIntVar(0, horizon, "end" + suffix)
duration = model.NewIntVar(min_duration, max_duration, "duration" + suffix)
interval_var = model.NewIntervalVar(start_var, duration, end_var, "interval" + suffix)
starts[job_id] = start_var
# create alternative intervals for each configuration
if num_configs > 1:
l_presences = []
for config_id, config in enumerate(job):
alt_suffix = "_j%i_a%i" % (job_id, config_id)
l_presence = model.NewBoolVar("presence" + alt_suffix)
l_start = model.NewIntVar(0, horizon, "start" + alt_suffix)
l_duration = config[1]
l_end = model.NewIntVar(0, horizon, "end" + alt_suffix)
l_interval = model.NewOptionalIntervalVar(
l_start, l_duration, l_end, l_presence, "interval" + alt_suffix
)
l_presences.append(l_presence)
# Link primary variable with alternative variable
model.Add(start_var == l_start).OnlyEnforceIf(l_presence)
model.Add(duration == l_duration).OnlyEnforceIf(l_presence)
model.Add(end_var == l_end).OnlyEnforceIf(l_presence)
# Add local interval to right machine list
machine = config[0]
machine_to_intervals[machine].append(l_interval)
# Store presences for the solution
presences[(job_id, config_id)] = l_presence
# Add constraint that only one of the alternative intervals can be active
model.Add(sum(l_presences) == 1)
else:
machine_to_intervals[job[0][0]].append(interval_var)
presences[(job_id, 0)] = model.NewConstant(1)
ends.append(end_var)
# Create and add disjunctive constraints.
for machine in all_machines:
job_starts = []
job_ends = []
job_indices = []
job_config_ids = []
for job_id in all_jobs:
job = jobs_data[job_id]
for config_id, config in enumerate(job):
if config[0] == machine:
job_starts.append(starts[job_id])
job_ends.append(ends[job_id])
job_indices.append(job_id)
job_config_ids.append(config_id)
intervals = machine_to_intervals[machine]
if len(intervals) > 1:
model.AddNoOverlap(intervals)
arcs = []
for j1 in range(len(job_starts)):
# Initial arc from the dummy node (0) to a task.
start_lit = model.NewBoolVar("%i is first job" % j1)
arcs.append((0, j1 + 1, start_lit))
# Final arc from an arc to the dummy node.
arcs.append((j1 + 1, 0, model.NewBoolVar("%i is last job" % j1)))
for j2 in range(len(job_starts)):
if j1 == j2:
continue
lit = model.NewBoolVar("%i follows %i" % (j2, j1))
arcs.append((j1 + 1, j2 + 1, lit))
tmp = [
lit,
presences[(job_indices[j1], job_config_ids[j1])],
presences[(job_indices[j2], job_config_ids[j2])]
]
min_distance = setup_times[machine][job_indices[j1]][job_indices[j2]]
model.add(starts[job_indices[j2]] >= ends[job_indices[j1]] + min_distance).OnlyEnforceIf(tmp)
model.AddCircuit(arcs)
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, "makespan")
model.AddMaxEquality(obj_var, ends)
model.Minimize(obj_var)
# Solve model.
solver = cp_model.CpSolver()
solution_printer = SolutionPrinter()
status = solver.Solve(model, solution_printer)
if status == cp_model.OPTIMAL:
# Print out makespan.
print("Optimal Schedule Length: %i" % solver.ObjectiveValue())
print()