examples/notebook/contrib/permutation_flow_shop.ipynb
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
First, you must install ortools package in this colab.
%pip install ortools
This model implements the permutation flow shop problem (PFSP).
In the PFSP, a set of jobs has to be processed on a set of machines. Each job must be processed on each machine in sequence and all jobs have to be processed in the same order on every machine. The objective is to minimize the makespan.
from typing import Sequence
from dataclasses import dataclass
from itertools import product
import numpy as np
from ortools.sat.colab import flags
from ortools.sat.python import cp_model
_PARAMS = flags.define_string(
"params",
"num_search_workers:16",
"Sat solver parameters.",
)
_TIME_LIMIT = flags.define_float(
"time_limit",
60.0,
"Time limit in seconds. Default is 60s.",
)
_LOG = flags.define_boolean(
"log",
False,
"Whether to log the solver output.",
)
@dataclass
class TaskType:
"""
Small wrapper to hold the start, end, and interval variables of a task.
"""
start: cp_model.IntVar
end: cp_model.IntVar
interval: cp_model.IntervalVar
def permutation_flow_shop(
processing_times: np.ndarray,
time_limit: float,
log: bool,
params: str
):
"""
Solves the given permutation flow shop problem instance with OR-Tools.
Parameters
----------
processing_times
An n-by-m matrix of processing times of the jobs on the machines.
time_limit
The time limit in seconds. If not set, the solver runs until an
optimal solution is found.
log
Whether to log the solver output. Default is False.
Raises
------
ValueError
If the number of lines is greater than 1, i.e., the instance is a
distributed permutation flow shop problem.
"""
m = cp_model.CpModel()
num_jobs, num_machines = processing_times.shape
horizon = processing_times.sum()
# Create interval variables for all tasks (each job/machine pair).
tasks = {}
for job, machine in product(range(num_jobs), range(num_machines)):
start = m.new_int_var(0, horizon, "")
end = m.new_int_var(0, horizon, "")
duration = processing_times[job][machine]
interval = m.new_interval_var(start, duration, end, "")
tasks[job, machine] = TaskType(start, end, interval)
# No overlap for all job intervals on this machine.
for machine in range(num_machines):
intervals = [tasks[job, machine].interval for job in range(num_jobs)]
m.add_no_overlap(intervals)
# Add precedence constraints between tasks of the same job.
for job, machine in product(range(num_jobs), range(num_machines - 1)):
pred = tasks[job, machine]
succ = tasks[job, machine + 1]
m.add(pred.end <= succ.start)
# Create arcs for circuit constraints.
arcs = []
for idx1 in range(num_jobs):
arcs.append((0, idx1 + 1, m.new_bool_var("start")))
arcs.append((idx1 + 1, 0, m.new_bool_var("end")))
lits = {}
for idx1, idx2 in product(range(num_jobs), repeat=2):
if idx1 != idx2:
lit = m.new_bool_var(f"{idx1} -> {idx2}")
lits[idx1, idx2] = lit
arcs.append((idx1 + 1, idx2 + 1, lit))
m.add_circuit(arcs)
# Enforce that the permutation of jobs is the same on all machines.
for machine in range(num_machines):
starts = [tasks[job, machine].start for job in range(num_jobs)]
ends = [tasks[job, machine].end for job in range(num_jobs)]
for idx1, idx2 in product(range(num_jobs), repeat=2):
if idx1 == idx2:
continue
# Since all machines share the same arc literals, if the literal
# i -> j is True, this enforces that job i is always scheduled
# before job j on all machines.
lit = lits[idx1, idx2]
m.add(ends[idx1] <= starts[idx2]).only_enforce_if(lit)
# Set minimizing makespan as objective.
obj_var = m.new_int_var(0, horizon, "makespan")
completion_times = [
tasks[(job, num_machines - 1)].end for job in range(num_jobs)
]
m.add_max_equality(obj_var, completion_times)
m.minimize(obj_var)
solver = cp_model.CpSolver()
if params:
solver.parameters.parse_text_format(params)
solver.parameters.log_search_progress = log
solver.parameters.max_time_in_seconds = time_limit
status_code = solver.Solve(m)
status = solver.StatusName(status_code)
print(f"Status: {status}")
print(f"Makespan: {solver.ObjectiveValue()}")
if status in ["OPTIMAL", "FEASIBLE"]:
start = [solver.Value(tasks[job, 0].start) for job in range(num_jobs)]
solution = np.argsort(start) + 1
print(f"Solution: {solution}")
def main(argv: Sequence[str]) -> None:
"""Creates the data and calls the solving procedure."""
# VRF_10_5_2 instance from http://soa.iti.es/problem-instances.
# Optimal makespan is 698.
processing_times = [
[79, 67, 10, 48, 52],
[40, 40, 57, 21, 54],
[48, 93, 49, 11, 79],
[16, 23, 19, 2, 38],
[38, 90, 57, 73, 3],
[76, 13, 99, 98, 55],
[73, 85, 40, 20, 85],
[34, 6, 27, 53, 21],
[38, 6, 35, 28, 44],
[32, 11, 11, 34, 27],
]
permutation_flow_shop(
np.array(processing_times), _TIME_LIMIT.value, _LOG.value, _PARAMS.value
)
app.run(main)