from extradata import get_data2
import collections
from multiprocessing import connection
from ortools.sat.python import cp_model
fields = ["start", "end", "duration",
"interval", "machine", "job_id", "name", "presence"]
ORTuple = collections.namedtuple(
"ORTuple", fields, defaults=(None,)*len(fields))
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.wall_time, self.objective_value)
)
self.__solution_count += 1
def create_vars(model, jobs, horizon, flow):
all_jobs = len(jobs)
tasks = {}
for job_id in range(all_jobs):
job = jobs[job_id]
num_tasks = len(job)
previous_end = None
for task_id in range(num_tasks):
task = job[task_id]
min_duration = task[0][0]
max_duration = task[0][0]
num_alternatives = len(task)
for alt_id in range(1, num_alternatives):
alt_duration = task[alt_id][0]
min_duration = min(min_duration, alt_duration)
max_duration = max(max_duration, alt_duration)
# Create main interval for the task.
suffix_name = "_j%i_t%i" % (job_id, task_id)
start = model.new_int_var(0, horizon, "start" + suffix_name)
duration = model.new_int_var(
min_duration, max_duration, "duration" + suffix_name
)
end = model.new_int_var(0, horizon, "end" + suffix_name)
interval = model.new_interval_var(
start, duration, end, "interval" + suffix_name
)
# Add precedence with previous task in the same job.
if previous_end is not None:
if flow == True:
model.add(start == previous_end) # This one doesn't work
else:
model.add(start <= previous_end) # This one works
# Store the start for the solution.
previous_end = end
task = ORTuple(
name=suffix_name,
start=start,
end=end,
interval=interval,
duration=duration,
job_id=job_id,
)
tasks[(job_id, task_id)] = task
return tasks
def create_alternative_intervals(model, jobs, tuples, horizon):
task_per_resources = collections.defaultdict(list)
presences = {}
for job_id, job in enumerate(jobs):
num_tasks = len(job)
for task_id in range(num_tasks):
task = job[task_id]
num_alternatives = len(task)
all_alternatives = range(num_alternatives)
# Create alternative intervals.
if num_alternatives > 1:
l_presences = []
for alt_id in all_alternatives:
alt_suffix = "_j%i_t%i_a%i" % (job_id, task_id, alt_id)
l_presence = model.new_bool_var("presence" + alt_suffix)
l_start = model.new_int_var(
0, horizon, "start" + alt_suffix)
l_duration = task[alt_id][0]
l_end = model.new_int_var(0, horizon, "end" + alt_suffix)
l_interval = model.new_optional_interval_var(
l_start, l_duration, l_end, l_presence, "interval" + alt_suffix
)
l_presences.append(l_presence)
l_task = ORTuple(
start=l_start,
end=l_end,
interval=l_interval,
duration=l_duration,
machine=task[alt_id][1],
presence=l_presence,
job_id=job_id,
)
reference_task = tuples[(job_id, task_id)]
# Link the primary/global variables with the local ones.
model.add(reference_task.start ==
l_start).only_enforce_if(l_presence)
model.add(reference_task.duration == l_duration).only_enforce_if(
l_presence)
model.add(reference_task.end ==
l_end).only_enforce_if(l_presence)
# Add the local interval to the right machine.
task_per_resources[task[alt_id][1]].append(l_task)
# Store the presences for the solution.
presences[(job_id, task_id, alt_id)] = l_presence
# Select exactly one presence variable.
model.add_exactly_one(l_presences)
else:
presence = model.new_constant(1)
reference_task = tuples[(job_id, task_id)]
reference_task = reference_task._replace(presence=presence)
task_per_resources[task[0][1]].append(
reference_task)
presences[(job_id, task_id, 0)] = reference_task.presence
return task_per_resources, presences
def setup_between_jobs(or_prev, or_next):
return abs(or_prev.job_id-or_next.job_id)*10
def setup_times(model, task_per_resources):
for rc, tasks in task_per_resources.items():
arcs = []
for u, or_prev in enumerate(tasks, 1):
name = f"0->{u}"
# Initial arc from the dummy node (0) to a task.
arcs.append((0, u, model.new_bool_var(name)))
# Final arc from an arc to the dummy node.
name = f"{u}->0"
arcs.append((u, 0, model.new_bool_var(name)))
arcs.append((u, u, or_prev.presence.Not()))
for v, or_next in enumerate(tasks, 1):
if u == v:
continue
min_distance = setup_between_jobs(or_prev, or_next)
link = model.new_bool_var(f"{rc}:{u}->{v}")
arcs.append((u, v, link))
model.AddImplication(link, or_prev.presence)
model.AddImplication(link, or_next.presence)
# We add the referenced task to link the literal with the
# start and end times of the two tasks.
model.add(
or_next.start >= or_prev.end + min_distance
).only_enforce_if(link)
arcs.append((0, 0, model.new_bool_var("0->0")))
model.add_circuit(arcs)
def flexible_jobshop(i=1):
"""solve a small flexible jobshop problem.""" # Data part.
flow = False
jobs = get_data2()
num_jobs = len(jobs)
all_jobs = range(num_jobs)
# Model the flexible jobshop problem.
model = cp_model.CpModel()
horizon = 0
for job in jobs:
for task in job:
max_task_duration = 0
for alternative in task:
max_task_duration = max(max_task_duration, alternative[0])
horizon += max_task_duration
horizon *= 10
print("Horizon = %i" % horizon)
# Global storage of variables.
tuples = create_vars(model, jobs, horizon, flow)
# Scan the jobs and create the relevant variables and intervals.
task_per_resources, presences = create_alternative_intervals(
model, jobs, tuples, horizon)
# Create machines constraints.
for machine_id in task_per_resources:
intervals = [task.interval for task in task_per_resources[machine_id]]
if len(intervals) > 1:
model.add_no_overlap(intervals)
setup_times(model, task_per_resources)
# Makespan objective
makespan = model.new_int_var(0, horizon, "makespan")
model.add_max_equality(makespan, [tup.end for tup in tuples.values()])
model.minimize(makespan)
# Solve model.
solver = cp_model.CpSolver()
solution_printer = SolutionPrinter()
proto_file = "proto_file_flow.dat" if flow else "proto_file.dat"
print('Writing proto to %s' % proto_file)
with open(proto_file, 'w') as text_file:
text_file.write(str(model))
print('Solving')
status = solver.solve(model, solution_printer)
# Print final solution.
for job_id in all_jobs:
print("Job %i:" % job_id)
for task_id in range(len(jobs[job_id])):
try:
start_value = solver.value(tuples[(job_id, task_id)].start)
except IndexError:
continue
machine = -1
duration = -1
selected = -1
for alt_id in range(len(jobs[job_id][task_id])):
try:
if solver.value(presences[(job_id, task_id, alt_id)]):
duration = jobs[job_id][task_id][alt_id][0]
machine = jobs[job_id][task_id][alt_id][1]
selected = alt_id
except KeyError:
continue
print(
" task_%i_%i starts at %i (alt %i, machine %i, duration %i)"
% (job_id, task_id, start_value, selected, machine, duration)
)
print("solve status: %s" % solver.status_name(status))
print("Optimal objective value: %i" % solver.objective_value)
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
if __name__ == "__main__":
flexible_jobshop()