shuttles = ["S1", "S2"]
# Define a single common depot
start_points = ["H", "I"]
common_end_point = "F"
common_charging_station_base_name = "G"
M = 10800 # Max time in seconds (120 minutes * 60 seconds/minute)
UNREACHABLE = 99999
# Original stops for travel time lookup
stops = ["A", "B", "C", "D", "E", "F", "G", "H", "I"]
travel_time_orig = {
("A", "A"): 0, ("A", "B"): 420, ("A", "C"): 300, ("A", "D"): 420, ("A", "E"): 300, ("A", "F"): 0, ("A", "G"): 300, ("A", "H"): 300, ("A", "I"): 300,
("B", "A"): 420, ("B", "B"): 0, ("B", "C"): 300, ("B", "D"): 600, ("B", "E"): 300, ("B", "F"): 420, ("B", "G"): 420, ("B", "H"): 660, ("B", "I"): 660,
("C", "A"): 300, ("C", "B"): 300, ("C", "C"): 0, ("C", "D"): 300, ("C", "E"): 420, ("C", "F"): 300, ("C", "G"): 300, ("C", "H"): 420, ("C", "I"): 600,
("D", "A"): 420, ("D", "B"): 600, ("D", "C"): 300, ("D", "D"): 0, ("D", "E"): 660, ("D", "F"): 420, ("D", "G"): 420, ("D", "H"): 300, ("D", "I"): 660,
("E", "A"): 300, ("E", "B"): 300, ("E", "C"): 420, ("E", "D"): 660, ("E", "E"): 0, ("E", "F"): 300, ("E", "G"): 300, ("E", "H"): 600, ("E", "I"): 420,
("F", "A"): 0, ("F", "B"): 420, ("F", "C"): 300, ("F", "D"): 420, ("F", "E"): 300, ("F", "F"): 0, ("F", "G"): 0, ("F", "H"): 300, ("F", "I"): 300,
("G", "A"): 0, ("G", "B"): 420, ("G", "C"): 300, ("G", "D"): 420, ("G", "E"): 300, ("G", "F"): 0, ("G", "G"): 0, ("G", "H"): 300, ("G", "I"): 300,
("H", "A"): 300, ("H", "B"): 660, ("H", "C"): 420, ("H", "D"): 300, ("H", "E"): 600, ("H", "F"): 300, ("H", "G"): 300, ("H", "H"): 0, ("H", "I"): 420,
("I", "A"): 0, ("I", "B"): 660, ("I", "C"): 600, ("I", "D"): 660, ("I", "E"): 420, ("I", "F"): 300, ("I", "G"): 300, ("I", "H"): 420, ("I", "I"): 0
}
parcel_df = pd.DataFrame({
'parcel_id': [ 0, 1, 2, 3, 4, 5, 6],
'origin_stop': ['A', 'C', 'A', 'B', 'E', 'C', 'D'],
'destination_stop': ['C', 'E', 'B', 'A', 'C', 'D', 'E'],
'pickup_request_time': [ 0, 1500, 0, 100, 0, 0, 0],
'latest_dropoff_time': [ M, 2500, M, M, M, M, M]
})
# Add parcel sizes
parcel_sizes = [0.25, 0.5, 0.75, 1.0]
parcel_df['size'] = [random.choice(parcel_sizes) for _ in range(len(parcel_df))]
# parcel_df = parcel_df[parcel_df['parcel_id'] < 3]
SHUTTLE_CAPACITY = 2.5 # in cft
num_parcels = len(parcel_df)
num_shuttles = len(shuttles)
# === Step 2: Dummy Nodes Creation for Single Depot ===
# OR-Tools uses integer indices internally. We'll map dummy nodes to these.
physical_stop_to_base_idx = {stop: i for i, stop in enumerate(stops)}
base_idx_to_physical_stop = {i: stop for i, stop in enumerate(stops)}
all_dummy_nodes_list = []
ortools_idx_to_dummy_node = {}
dummy_node_to_ortools_idx = {}
current_ortools_idx = 0
parcel_ortools_node_map = {}
start_point_dummy_nodes = []
start_point_ortools_indices = []
charging_station_dummy_nodes = []
charging_station_ortools_indices = []
appearance_counter = defaultdict(int)
# Create the start points FIRST
for start_point in start_points:
appearance_counter[start_point] += 1
dummy_node = f"{start_point}{appearance_counter[start_point]}"
all_dummy_nodes_list.append(dummy_node)
dummy_node_to_ortools_idx[dummy_node] = current_ortools_idx
ortools_idx_to_dummy_node[current_ortools_idx] = dummy_node
start_point_dummy_nodes.append(dummy_node)
start_point_ortools_indices.append(current_ortools_idx)
current_ortools_idx += 1
# Create the common end point
appearance_counter[common_end_point] += 1
common_end_point_dummy_node = f"{common_end_point}{appearance_counter[common_end_point]}"
all_dummy_nodes_list.append(common_end_point_dummy_node)
dummy_node_to_ortools_idx[common_end_point_dummy_node] = current_ortools_idx
ortools_idx_to_dummy_node[current_ortools_idx] = common_end_point_dummy_node
end_point_ortools_idx = current_ortools_idx
current_ortools_idx += 1
# All shuttles start and end at this single depot
starts_list = start_point_ortools_indices
ends_list = [end_point_ortools_idx] * num_shuttles
# Create separate charging nodes for each shuttle
for i in range(num_shuttles*2):
appearance_counter[common_charging_station_base_name] += 1
charge_dummy_node = f"{common_charging_station_base_name}{appearance_counter[common_charging_station_base_name]}"
all_dummy_nodes_list.append(charge_dummy_node)
dummy_node_to_ortools_idx[charge_dummy_node] = current_ortools_idx
ortools_idx_to_dummy_node[current_ortools_idx] = charge_dummy_node
charging_station_ortools_indices.append(current_ortools_idx)
charging_station_dummy_nodes.append(charge_dummy_node)
current_ortools_idx += 1
# Create dummy pickup/dropoff nodes for each parcel
for i, row in parcel_df.iterrows():
o, d = row['origin_stop'], row['destination_stop']
appearance_counter[o] += 1
pickup_dummy_node = f"{o}{appearance_counter[o]}"
all_dummy_nodes_list.append(pickup_dummy_node)
dummy_node_to_ortools_idx[pickup_dummy_node] = current_ortools_idx
ortools_idx_to_dummy_node[current_ortools_idx] = pickup_dummy_node
pickup_ortools_idx = current_ortools_idx
current_ortools_idx += 1
appearance_counter[d] += 1
dropoff_dummy_node = f"{d}{appearance_counter[d]}"
all_dummy_nodes_list.append(dropoff_dummy_node)
dummy_node_to_ortools_idx[dropoff_dummy_node] = current_ortools_idx
ortools_idx_to_dummy_node[current_ortools_idx] = dropoff_dummy_node
dropoff_ortools_idx = current_ortools_idx
current_ortools_idx += 1
parcel_ortools_node_map[i] = {
'pickup_node': pickup_dummy_node,
'dropoff_node': dropoff_dummy_node,
'pickup_ortools_idx': pickup_ortools_idx,
'dropoff_ortools_idx': dropoff_ortools_idx
}
# Now we have all nodes defined. Total number of OR-Tools nodes:
num_ortools_nodes = len(all_dummy_nodes_list)
print(f"DEBUG: all_dummy_nodes_list: {all_dummy_nodes_list}")
print(f"DEBUG: num_ortools_nodes: {num_ortools_nodes}")
print(f"DEBUG: start_point_dummy_node: {start_point_dummy_nodes} (OR-Tools index: {start_point_ortools_indices})")
print(f"DEBUG: end_point_dummy_node: {common_end_point_dummy_node} (OR-Tools index: {end_point_ortools_idx})")
print(f"DEBUG: charging_station_dummy_node base names: {charging_station_dummy_nodes} (OR-Tools indices: {charging_station_ortools_indices})")
# === Step 3: Travel Time Matrix Between Dummy Nodes ===
ortools_travel_time_matrix = [[0 for _ in range(num_ortools_nodes)] for _ in range(num_ortools_nodes)]
for i_ortools_idx in range(num_ortools_nodes):
for j_ortools_idx in range(num_ortools_nodes):
dummy_node_i = ortools_idx_to_dummy_node[i_ortools_idx]
dummy_node_j = ortools_idx_to_dummy_node[j_ortools_idx]
base_i = ''.join(filter(str.isalpha, dummy_node_i))
base_j = ''.join(filter(str.isalpha, dummy_node_j))
travel_time = travel_time_orig.get((base_i, base_j), UNREACHABLE)
if travel_time == 0:
travel_time = 1 # OR-Tools prefers non-zero travel times for time dimension
ortools_travel_time_matrix[i_ortools_idx][j_ortools_idx] = travel_time
# === Step 4: OR-Tools Data Model Creation and Population (Consolidated) ===
data = {}
data['time_matrix'] = ortools_travel_time_matrix
data['num_vehicles'] = num_shuttles
data['starts'] = starts_list
data['ends'] = ends_list
data['pickups_deliveries'] = []
data['demands'] = [0] * num_ortools_nodes
data['time_windows'] = [(0, M)] * num_ortools_nodes # default 0 to M for all nodes
data['vehicle_capacities'] = [int(SHUTTLE_CAPACITY * 100)] * num_shuttles
data['service_times'] = [0] * num_ortools_nodes
# Charging related data
data['charging_station_ortools_idx'] = charging_station_ortools_indices
data['charging_station_node'] = charging_station_dummy_nodes
data['energy_consumption_per_travel_time_unit'] = 2
data['energy_consumption_per_wait_serve_time_unit'] = 1
data['battery_capacity'] = 2500
data['recharge_time'] = 600
data['recharge_penalty'] = 10**9
# Set service time for all charging station nodes
for charge_idx in data['charging_station_ortools_idx']:
data['service_times'][charge_idx] = data['recharge_time']
# Populate demands and time windows once here
for p_id, row in parcel_df.iterrows():
pickup_ortools_idx = parcel_ortools_node_map[p_id]['pickup_ortools_idx']
dropoff_ortools_idx = parcel_ortools_node_map[p_id]['dropoff_ortools_idx']
parcel_size = row['size']
data['pickups_deliveries'].append([pickup_ortools_idx, dropoff_ortools_idx]) # Keep this for consistency, though AddPickupAndDelivery might handle internally
data['demands'][pickup_ortools_idx] = int(parcel_size * 100)
data['demands'][dropoff_ortools_idx] = -int(parcel_size * 100)
data['time_windows'][pickup_ortools_idx] = (
int(row['pickup_request_time']),
int(row['latest_dropoff_time'])
)
data['time_windows'][dropoff_ortools_idx] = (
int(row['pickup_request_time']),
int(row['latest_dropoff_time'])
)
# Set service time for pickup and dropoff nodes to 10 seconds
data['service_times'][pickup_ortools_idx] = 10
data['service_times'][dropoff_ortools_idx] = 10
# Initialize Routing Index Manager
manager = pywrapcp.RoutingIndexManager(
num_ortools_nodes,
data['num_vehicles'],
data['starts'],
data['ends']
)
# Create Routing Model
routing = pywrapcp.RoutingModel(manager)
# Create and register a transit callback (for travel time)
def transit_callback(from_index, to_index):
"""Returns the travel time between the two nodes."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data['time_matrix'][from_node][to_node] + data['service_times'][from_node]
transit_callback_index = routing.RegisterTransitCallback(transit_callback)
# 1. Impose Time Windows
# Add Time Dimension
time = 'Time'
routing.AddDimension(
transit_callback_index,
M, # allow waiting time
M, # maximum time per vehicle
False, # Don't force start cumul to zero (start time already handled by 0 for depots)
time
)
time_dimension = routing.GetDimensionOrDie(time)
# Apply time windows to all valid OR-Tools internal indices
print("Applying time windows:")
num_internal_nodes = len(start_point_ortools_indices) + len(charging_station_ortools_indices) \
+ len(parcel_ortools_node_map)*2 + num_shuttles # +num_shuttles to account for the end nodes
for internal_idx in range(num_internal_nodes):
try:
ext_idx = manager.IndexToNode(internal_idx)
if ext_idx not in ortools_idx_to_dummy_node:
print(f" ⚠️ Skipping internal index {internal_idx} → external {ext_idx}: Not in dummy node map.")
continue
node_name = ortools_idx_to_dummy_node.get(ext_idx, f"Unknown({ext_idx})")
if ext_idx in start_point_ortools_indices or ext_idx in charging_station_ortools_indices or ext_idx == end_point_ortools_idx:
time_dimension.CumulVar(internal_idx).SetRange(0, M)
print(f" ✅ Set time window for start/end/charging node {node_name} (internal={internal_idx}, external={ext_idx}) to (0, {M})")
else:
tw = data['time_windows'][ext_idx]
time_dimension.CumulVar(internal_idx).SetRange(tw[0], tw[1])
print(f" ✅ Set time window for {node_name} (internal={internal_idx}, external={ext_idx}) to {tw}")
except Exception as e:
print(f" ❌ Skipping internal index {internal_idx}: {e}")
# 2. Impose Capacity Constraints
# Add capacity dimension
def demand_callback(from_index):
"""Returns the demand of the node."""
from_node = manager.IndexToNode(from_index)
return data['demands'][from_node]
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
capacity = 'Capacity'
routing.AddDimension(
demand_callback_index,
0, # null capacity slack
max(data['vehicle_capacities']), # vehicle maximum capacity
True, # start cumul to zero
capacity
)
capacity_dimension = routing.GetDimensionOrDie(capacity)
# Set vehicle capacities
for i in range(data['num_vehicles']):
vehicle_capacity = data['vehicle_capacities'][i]
start_index = routing.Start(i) # This is the internal index for vehicle i's start node
end_index = routing.End(i)
# Ensure vehicle starts empty
capacity_dimension.CumulVar(start_index).SetRange(0, 0)
# Ensure vehicle returns empty
capacity_dimension.CumulVar(end_index).SetRange(0, 0)
# 3. Impose Energy Dimension
def battery_level_delta_callback(from_index, to_index):
"""Returns the change in battery level (negative for consumption)."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
travel_time = data['time_matrix'][from_node][to_node]
service_time_at_from_node = data['service_times'][from_node]
# Energy consumed during travel
energy_consumed_travel = travel_time * data['energy_consumption_per_travel_time_unit']
# Energy consumed during service/waiting at the 'from' node
energy_consumed_service = service_time_at_from_node * data['energy_consumption_per_wait_serve_time_unit']
if to_node in data['charging_station_ortools_idx']:
total_energy_consumed_on_arc = (energy_consumed_travel + energy_consumed_service) - data['battery_capacity']
else:
total_energy_consumed_on_arc = energy_consumed_travel + energy_consumed_service
# The dimension tracks current battery level, so consumption is negative
return -total_energy_consumed_on_arc
battery_level_delta_callback_index = routing.RegisterTransitCallback(battery_level_delta_callback)
energy_level = 'EnergyLevel'
routing.AddDimension(
battery_level_delta_callback_index,
0, # Minimum battery level allowed (cannot go below 0)
data['battery_capacity'], # Maximum battery level (full charge)
False, # Do not force start cumul to zero (we set specific start value below)
energy_level
)
energy_dimension = routing.GetDimensionOrDie(energy_level)
# Set initial battery level for all vehicles (at their start nodes)
for i in range(data['num_vehicles']):
start_index = routing.Start(i)
# Vehicles start with full battery
energy_dimension.CumulVar(start_index).SetRange(int(data['battery_capacity']/2), int(data['battery_capacity']/2))
# Enforce full battery level at the charging station *if* visited
for charging_ortools_idx in data['charging_station_ortools_idx']:
charging_node_internal_idx = manager.NodeToIndex(charging_ortools_idx)
if charging_node_internal_idx != -1:
# If a vehicle visits the charging node, its battery level must be full upon arrival
# This acts as the "recharge" mechanism
energy_dimension.CumulVar(charging_node_internal_idx).SetRange(0, data['battery_capacity'])
else:
print(f"Warning: Charging station node {charging_ortools_idx} could not be mapped to an internal OR-Tools index. Energy constraint for charging not fully applied.")
# 4. Arc Cost Evaluator (combining travel time and charging penalty)
def total_arc_cost_callback(from_index, to_index):
"""Returns the total cost for an arc, including travel time and potential charging penalty."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
travel_service_cost = data['time_matrix'][from_node][to_node] + data['service_times'][from_node]
return travel_service_cost
total_arc_cost_callback_index = routing.RegisterTransitCallback(total_arc_cost_callback)
routing.SetArcCostEvaluatorOfAllVehicles(total_arc_cost_callback_index)
MISSED_REQUEST_PENALTY = 10**12
# Add Pickup and Delivery (P&D) constraints and time/vehicle constraints
for p_id, row in parcel_df.iterrows():
pickup_ortools_idx = parcel_ortools_node_map[p_id]['pickup_ortools_idx']
dropoff_ortools_idx = parcel_ortools_node_map[p_id]['dropoff_ortools_idx']
# IMPORTANT: Use manager.NodeToIndex() to get the internal OR-Tools index
pickup_internal_idx = manager.NodeToIndex(pickup_ortools_idx)
dropoff_internal_idx = manager.NodeToIndex(dropoff_ortools_idx)
# CRUCIAL: Add pickup and delivery constraint (ensures same vehicle, correct order)
routing.AddPickupAndDelivery(pickup_internal_idx, dropoff_internal_idx)
# Add disjunctions for the pickup node to allow it to be skipped with a penalty
routing.AddDisjunction(
[pickup_internal_idx],
MISSED_REQUEST_PENALTY
)
routing.AddDisjunction(
[dropoff_internal_idx],
MISSED_REQUEST_PENALTY
)
# Explicitly enforce same vehicle for pickup and delivery (added in previous interaction)
routing.solver().Add(
routing.VehicleVar(pickup_internal_idx) == routing.VehicleVar(dropoff_internal_idx)
)
# Enforce pickup time before dropoff time
pickup_time_var = time_dimension.CumulVar(pickup_internal_idx)
dropoff_time_var = time_dimension.CumulVar(dropoff_internal_idx)
routing.solver().Add(pickup_time_var <= dropoff_time_var)
# Set up specific charging node rules
solver = routing.solver()
for charging_idx in data['charging_station_ortools_idx']:
# Get the OR-Tools internal index for charging station
internal_idx = manager.NodeToIndex(charging_idx)
if internal_idx != -1:
routing.AddDisjunction([internal_idx], 0)
# Configure solver parameters
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
search_parameters.time_limit.FromSeconds(60)
# search_parameters.log_search = True # Enable logging for debugging
# search_parameters.solution_limit = 1
# Solve the problem
solution = routing.SolveWithParameters(search_parameters)