from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from desdeo_tools.interaction.request import BaseRequest
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
from desdeo_mcdm.interactive.InteractiveMethod import InteractiveMethod
[docs]class ENautilusException(Exception):
"""Raised when an exception related to ENautilus is encountered.
"""
pass
[docs]class ENautilusInitialRequest(BaseRequest):
""" A request class to handle the initial preferences.
"""
def __init__(self, ideal: np.ndarray, nadir: np.ndarray):
msg = (
"Please specify the number of iterations as 'n_iterations' to be carried out, and how many intermediate "
"points to show as 'n_points'."
)
content = {
"message": msg,
"ideal": ideal,
"nadir": nadir,
}
super().__init__("reference_point_preference", "required", content=content)
[docs] def validator(self, response: Dict) -> None:
if "n_iterations" not in response:
raise ENautilusException("'n_iterations' entry missing")
if "n_points" not in response:
raise ENautilusException("'n_points' entry missing")
n_iterations = response["n_iterations"]
if not isinstance(n_iterations, int) or int(n_iterations) < 1:
raise ENautilusException("'n_iterations' must be a positive integer greater than zero")
n_points = response["n_points"]
if not isinstance(n_points, int) or int(n_points) < 1:
raise ENautilusException("'n_points' must be a positive integer greater than zero")
[docs] @classmethod
def init_with_method(cls, method):
return cls(method._ideal, method._nadir)
@BaseRequest.response.setter
def response(self, response: Dict):
self.validator(response)
self._response = response
[docs]class ENautilusRequest(BaseRequest):
"""A request class to handle the intermediate requests.
"""
def __init__(
self,
ideal: np.ndarray,
nadir: np.ndarray,
points: np.ndarray,
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
distances: np.ndarray,
minimize: List[int],
):
self._max_index = len(np.squeeze(points))
msg = "Please select the most preferred point by index as 'preferred_point_index'"
content = {
"message": msg,
"ideal": ideal,
"nadir": nadir,
"points": points,
"lower_bounds": lower_bounds,
"upper_bounds": upper_bounds,
"distances": distances,
"minimize": minimize,
}
super().__init__("reference_point_preference", "required", content=content)
[docs] def validator(self, response: Dict) -> None:
if "preferred_point_index" not in response:
raise ENautilusException("'preferred_point' not specified")
pref_point_index = response["preferred_point_index"]
if pref_point_index < 0 or pref_point_index > self._max_index:
raise ENautilusException("The given index is out of bounds.")
@BaseRequest.response.setter
def response(self, response: Dict):
self.validator(response)
self._response = response
[docs]class ENautilusStopRequest(BaseRequest):
"""A request class to handle termination.
"""
def __init__(self, preferred_point: np.ndarray):
msg = "Most preferred solution found."
content = {"message": msg, "solution": preferred_point}
super().__init__("print", "no_interaction", content=content)
[docs]class ENautilus(InteractiveMethod):
def __init__(
self,
pareto_front: np.ndarray,
ideal: np.ndarray,
nadir: np.ndarray,
objective_names: Optional[List[str]] = None,
minimize: Optional[List[int]] = None,
):
"""
Args:
pareto_front (np.ndarray): A two dimensional numpy array
representing a Pareto front with objective vectors on each of its
rows.
ideal (np.ndarray): The ideal objective vector of the problem
being represented by the Pareto front.
nadir (np.ndarray): The nadir objective vector of the problem
being represented by the Pareto front.
objective_names (Optional[List[str]], optional): Names of the
objectives. List must match the number of columns in
pareto_front. Defaults to 'f1', 'f2', 'f3', ...
minimize (Optional[List[int]], optional): Multipliers for each
objective. '-1' indicates maximization and '1' minimization.
Defaults to all objective values being minimized.
Raises:
ENavigatorException: One or more dimension mismatches are
encountered among the supplies arguments.
"""
if not pareto_front.ndim == 2:
raise ENautilusException(
"The supplied Pareto front should be a two dimensional array. Found "
f" number of dimensions {pareto_front.ndim}."
)
if not ideal.shape[0] == pareto_front.shape[1]:
raise ENautilusException(
"The Pareto front must consist of objective vectors with the "
"same number of objectives as defined in the ideal and nadir "
"points."
)
if not ideal.shape == nadir.shape:
raise ENautilusException("The dimensions of the ideal and nadir point do not match.")
if objective_names:
if not len(objective_names) == ideal.shape[0]:
raise ENautilusException(
"The supplied objective names must have a leangth equal to " "the numbr of objectives."
)
self._objective_names = objective_names
else:
self._objective_names = [f"f{i+1}" for i in range(ideal.shape[0])]
if minimize:
if not len(objective_names) == ideal.shape[0]:
raise ENautilusException("The minimize list must have " "as many elements as there are objectives.")
self._minimize = minimize
else:
self._minimize = [1 for _ in range(ideal.shape[0])]
self._ideal = ideal
self._nadir = nadir
# in objective space!
self._pareto_front = pareto_front
# bounds of the rechable region
self._reachable_ub = self._nadir
self._reachable_lb = self._ideal
# currently reachable solution as a list of indices of the Pareto front
self._reachable_idx = list(range(0, self._pareto_front.shape[0]))
# current iteration step number
self._step_number = 1
self._distance = None
self._preferred_point = None
self._projection_index = None
self._n_iterations = None
self._n_points = None
self._n_iterations_left = None
[docs] def start(self) -> ENautilusInitialRequest:
return ENautilusInitialRequest.init_with_method(self)
[docs] def iterate(
self, request: Union[ENautilusInitialRequest, ENautilusRequest]
) -> Union[ENautilusRequest, ENautilusStopRequest]:
"""Perform the next logical iteratino step based on the given request type.
"""
if type(request) is ENautilusInitialRequest:
return self.handle_initial_request(request)
elif type(request) is ENautilusRequest:
return self.handle_request(request)
else:
# if stop request, do nothing
return request
[docs] def handle_initial_request(self, request: ENautilusInitialRequest) -> ENautilusRequest:
"""Handles the initial request by parsing the response appropiately.
"""
self._n_iterations = request.response["n_iterations"]
self._n_points = request.response["n_points"]
self._n_iterations_left = self._n_iterations
# self._intermediate_points = np.repeat(np.atleast_2d(self._nadir), self._n_points, axis=0)
self._preferred_point = self._nadir
zbars = self.calculate_representative_points(self._pareto_front, self._reachable_idx, self._n_points)
zs = self.calculate_intermediate_points(self._preferred_point, zbars, self._n_iterations_left)
new_lower_bounds, new_upper_bounds = self.calculate_bounds(self._pareto_front, zs)
distances = self.calculate_distances(zs, zbars, self._nadir)
return ENautilusRequest(
self._ideal, self._nadir, zs, new_lower_bounds, new_upper_bounds, distances, self._minimize
)
[docs] def handle_request(self, request: ENautilusRequest) -> Union[ENautilusRequest, ENautilusStopRequest]:
"""Handles the intermediate requests.
"""
preferred_point_index = request.response["preferred_point_index"]
self._preferred_point = request.content["points"][preferred_point_index]
if self._n_iterations_left <= 1:
self._n_iterations_left = 0
return ENautilusStopRequest(self._preferred_point)
self._reachable_lb = request.content["lower_bounds"][preferred_point_index]
self._reachable_ub = request.content["upper_bounds"][preferred_point_index]
self._distance = request.content["distances"][preferred_point_index]
self._reachable_idx = self.calculate_reachable_point_indices(
self._pareto_front, self._reachable_lb, self._reachable_ub
)
# increment and decrement iterations
self._n_iterations_left -= 1
self._step_number += 1
# Start again
zbars = self.calculate_representative_points(self._pareto_front, self._reachable_idx, self._n_points)
zs = self.calculate_intermediate_points(self._preferred_point, zbars, self._n_iterations_left)
new_lower_bounds, new_upper_bounds = self.calculate_bounds(self._pareto_front, zs)
distances = self.calculate_distances(zs, zbars, self._nadir)
return ENautilusRequest(
self._ideal, self._nadir, zs, new_lower_bounds, new_upper_bounds, distances, self._minimize
)
[docs] def calculate_representative_points(
self, pareto_front: np.ndarray, subset_indices: List[int], n_points: int
) -> np.ndarray:
"""Calcualtes the most representative points on the Pareto front. The points are clustered using k-means.
Args:
pareto_front (np.ndarray): The Pareto front.
subset_indices (List[int]): A list of indices representing the
subset of the points on the Pareto front for which the
representative points should be calculated.
n_points (int): The number of representative points to be calculated.
Returns:
np.ndarray: A 2D array of the most representative points. If the
subset of Pareto efficient points is less than n_points, returns
the subset of the Pareto front.
"""
if len(np.atleast_1d(subset_indices)) > n_points:
kmeans = KMeans(n_clusters=n_points)
kmeans.fit(pareto_front[subset_indices])
closest, _ = pairwise_distances_argmin_min(kmeans.cluster_centers_, pareto_front[subset_indices])
zbars = pareto_front[subset_indices][closest]
else:
zbars = pareto_front[subset_indices]
return zbars
[docs] def calculate_bounds(
self, pareto_front: np.ndarray, intermediate_points: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate the new bounds of the reachable points on the Pareto
optimal front from each of the intermediate points.
Args:
pareto_front (np.ndarray): The Pareto optimal front.
intermediate_points (np.ndarray): The current intermedaite points as a 2D array.
Returns:
Tuple[np.ndarray, np.ndarray]: The lower and upper bounds for each of the intermediate points.
"""
_pareto_front = np.atleast_2d(pareto_front)
n_points = np.atleast_2d(intermediate_points).shape[0]
new_lower_bounds = np.zeros((n_points, _pareto_front.shape[1]))
new_upper_bounds = np.zeros((n_points, _pareto_front.shape[1]))
for i, point in enumerate(np.atleast_2d(intermediate_points)):
# TODO: vectorize this loop
for r in range(_pareto_front.shape[1]):
mask = np.zeros(_pareto_front.shape[1], dtype=bool)
mask[r] = True
subject_to = _pareto_front[:, ~mask].reshape((_pareto_front.shape[0], _pareto_front.shape[1] - 1))
con_mask = np.all(subject_to <= point[~mask], axis=1)
min_val = np.min(_pareto_front[con_mask, mask])
max_val = np.max(_pareto_front[con_mask, mask])
new_lower_bounds[i, r] = min_val
new_upper_bounds[i, r] = max_val
return new_lower_bounds, new_upper_bounds
[docs] def calculate_distances(self, intermediate_points: np.ndarray, zbars: np.ndarray, nadir: np.ndarray) -> np.ndarray:
distances = np.linalg.norm(np.atleast_2d(intermediate_points) - nadir, axis=1) / np.linalg.norm(
np.atleast_2d(zbars) - nadir, axis=1
)
"""Calculates the distance to the Pareto front for each intermediate
point given utilizing representative points representing the
intermediate points.
Args:
intermediate_points (np.ndarray): The intermediate points, 2D array.
zbars (np.ndarray): The representative points corresponding to the intermediate points, 2D array.
nadir (np.ndarray): The nadir point, 1D array.
Returns:
np.ndarray: The distances calculated for each intermediate point to the Pareto front.
"""
return distances * 100
[docs] def calculate_reachable_point_indices(
self, pareto_front: np.ndarray, lower_bounds: np.ndarray, upper_bounds: np.ndarray,
) -> List[int]:
"""Calculate the indices of the reachable Pareto optimal solutions
based on lower and upper bounds.
Returns:
List[int]: List of the indices of the reachable solutions.
"""
low_idx = np.all(pareto_front >= lower_bounds, axis=1)
up_idx = np.all(pareto_front <= upper_bounds, axis=1)
reachable_idx = np.argwhere(low_idx & up_idx).squeeze()
return reachable_idx
if __name__ == "__main__":
# front = np.array([[1, 2, 3], [2, 3, 4], [2, 2, 3], [3, 2, 1]], dtype=float)
# ideal = np.zeros(3)
# nadir = np.ones(3) * 5
f1 = np.linspace(1, 100, 50)
f2 = f1[::-1] ** 2
front = np.stack((f1, f2)).T
ideal = np.min(front, axis=0)
nadir = np.max(front, axis=0)
method = ENautilus((front), ideal, nadir)
req = method.start()
n_iterations = 11
n_points = 4
req.response = {
"n_iterations": n_iterations,
"n_points": n_points,
}
req = method.iterate(req)
req.response = {"preferred_point_index": 0}
while method._n_iterations_left > 1:
print(method._n_iterations_left)
req = method.iterate(req)
print(req.content["points"])
req.response = {"preferred_point_index": 0}
print(method._n_iterations_left)
req = method.iterate(req)
print(method._n_iterations_left)
print(method._distance)
print(req.content["solution"])