from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from desdeo_mcdm.interactive.InteractiveMethod import InteractiveMethod
from desdeo_tools.interaction.request import BaseRequest, SimplePlotRequest
from desdeo_tools.scalarization.ASF import PointMethodASF
from desdeo_tools.scalarization.Scalarizer import DiscreteScalarizer
from desdeo_tools.solver.ScalarSolver import DiscreteMinimizer
[docs]class NautilusNavigatorRequest(BaseRequest):
"""Request to handle interactions with NAUTILUS Navigator. See the
NautilusNavigator class for further details.
"""
def __init__(
self,
ideal: np.ndarray,
nadir: np.ndarray,
reachable_lb: np.ndarray,
reachable_ub: np.ndarray,
reachable_idx: List[int],
step_number: int,
steps_remaining: int,
distance: float,
allowed_speeds: [int],
current_speed: int,
navigation_point: np.ndarray,
):
msg = (
# TODO: Be more specific...
"Please supply aspirations levels for each objective between "
"the upper and lower bounds as `reference_point`. Specify a "
"speed between 1-5 as `speed`. If going to a previous step is "
"desired, please set `go_to_previous` to True, otherwise it should "
"be False. "
"Lastly, if stopping is desired, `stop` should be True, "
"otherweise it should be set to False."
)
content = {
"message": msg,
"ideal": ideal,
"nadir": nadir,
"reachable_lb": reachable_lb,
"reachable_ub": reachable_ub,
"reachable_idx": reachable_idx,
"step_number": step_number,
"steps_remaining": steps_remaining,
"distance": distance,
"allowed_speeds": allowed_speeds,
"current_speed": current_speed,
"navigation_point": navigation_point,
}
super().__init__("reference_point_preference", "required", content=content)
[docs] @classmethod
def init_with_method(cls, method):
return cls(
method._ideal,
method._nadir,
method._reachable_lb,
method._reachable_ub,
method._reachable_idx,
method._step_number,
method._steps_remaining,
method._distance,
method._allowed_speeds,
method._current_speed,
method._navigation_point,
)
[docs] def validator(self, response: Dict) -> None:
if "reference_point" not in response:
raise NautilusNavigatorException("'reference_point' entry missing.")
if "speed" not in response:
raise NautilusNavigatorException("'speed' entry missing.")
if "go_to_previous" not in response:
raise NautilusNavigatorException("'go_to_previous' entry missing.")
if "stop" not in response:
raise NautilusNavigatorException("'stop' entry missing.")
ref_point = response["reference_point"]
try:
if np.any(ref_point < self._content["ideal"]) or np.any(ref_point > self._content["nadir"]):
raise NautilusNavigatorException(
f"The given reference point {ref_point} "
"must be between the ranges imposed by the ideal and nadir points."
)
except Exception as e:
raise NautilusNavigatorException(
f"An exception rose when validating the given reference point {ref_point}.\n"
f"Previous exception: {type(e)}: {str(e)}."
)
speed = response["speed"]
try:
if int(speed) not in self._content["allowed_speeds"]:
raise NautilusNavigatorException(f"Invalid speed: {speed}.")
except Exception as e:
raise NautilusNavigatorException(
f"An exception rose when validating the given speed {speed}.\n"
f"Previous exception: {type(e)}: {str(e)}."
)
if not type(response["go_to_previous"]) == bool:
raise (
f"Non boolean value {response['go_to_previous']} "
f"found for 'go_to_previous' when validating the response."
)
if not type(response["stop"]) == bool:
raise (f"Non boolean value {response['stop']} " f"found for 'go_to_previous' when validating the response.")
@BaseRequest.response.setter
def response(self, response: Dict):
self.validator(response)
self._response = response
[docs]class NautilusNavigatorException(Exception):
"""Raised when an exception related to NAUTILUS Navigator is encountered.
"""
pass
[docs]class NautilusNavigator(InteractiveMethod):
"""Implementations of the NAUTILUS Navigator algorithm.
"""
def __init__(
self,
pareto_front: np.ndarray,
ideal: np.ndarray,
nadir: np.ndarray,
objective_names: Optional[List[str]] = None,
minimize: Optional[List[int]] = None,
):
"""
Args:
pareto_front (np.ndarray): A two dimensional numpy array
representing a Pareto front with objective vectors on each of its
rows.
ideal (np.ndarray): The ideal objective vector of the problem
being represented by the Pareto front.
nadir (np.ndarray): The nadir objective vector of the problem
being represented by the Pareto front.
objective_names (Optional[List[str]], optional): Names of the
objectives. List must match the number of columns in
pareto_front. Defaults to 'f1', 'f2', 'f3', ...
minimize (Optional[List[int]], optional): Multipliers for each
objective. '-1' indicates maximization and '1' minimization.
Defaults to all objective values being minimized.
Raises:
NautilusNavigatorException: One or more dimension mismatches are
encountered among the supplies arguments.
"""
if not pareto_front.ndim == 2:
raise NautilusNavigatorException(
"The supplied Pareto front should be a two dimensional array. Found "
f" number of dimensions {pareto_front.ndim}."
)
if not ideal.shape[0] == pareto_front.shape[1]:
raise NautilusNavigatorException(
"The Pareto front must consist of objective vectors with the "
"same number of objectives as defined in the ideal and nadir "
"points."
)
if not ideal.shape == nadir.shape:
raise NautilusNavigatorException("The dimensions of the ideal and nadir point do not match.")
if objective_names:
if not len(objective_names) == ideal.shape[0]:
raise NautilusNavigatorException(
"The supplied objective names must have a leangth equal to " "the numbr of objectives."
)
self._objective_names = objective_names
else:
self._objective_names = [f"f{i+1}" for i in range(ideal.shape[0])]
if minimize:
if not len(objective_names) == ideal.shape[0]:
raise NautilusNavigatorException(
"The minimize list must have " "as many elements as there are objectives."
)
self._minimize = minimize
else:
self._minimize = [1 for _ in range(ideal.shape[0])]
self._ideal = ideal
self._nadir = nadir
# in objective space!
self._pareto_front = pareto_front
# bounds of the rechable region
self._reachable_ub = self._nadir
self._reachable_lb = self._ideal
# currently reachable solution as a list of indices of the Pareto front
self._reachable_idx = list(range(0, self._pareto_front.shape[0]))
# current iteration step number
self._step_number = 1
# iterations left
self._steps_remaining = 100
# L2 distance to the supplied Pareto front
self._distance = 0
self._allowed_speeds = [1, 2, 3, 4, 5]
self._current_speed = None
self._reference_point = None
self._navigation_point = self._nadir
self._projection_index = None
[docs] def start(self) -> NautilusNavigatorRequest:
"""Returns the first Request object to begin iterating.
Returns:
NautilusNavigatorRequest: The Request.
"""
return NautilusNavigatorRequest.init_with_method(self)
[docs] def iterate(self, request: NautilusNavigatorRequest) -> NautilusNavigatorRequest:
"""Perform the next logical step based on the response in the
Request.
"""
reqs = self.handle_request(request)
return reqs
[docs] def handle_request(self, request: NautilusNavigatorRequest) -> NautilusNavigatorRequest:
"""Handle the Request and its contents.
Args:
request (NautilusNavigatorRequest): A Request with a defined response.
Returns:
NautilusNavigatorRequest: Some of the contents of the response are invalid.
"""
preference_point = request.response["reference_point"]
speed = request.response["speed"]
go_to_previous = request.response["go_to_previous"]
stop = request.response["stop"]
if go_to_previous:
step_number = request.content["step_number"]
nav_point = request.content["navigation_point"]
lower_bounds = request.content["reachable_lb"]
upper_bounds = request.content["reachable_ub"]
reachable_idx = request.content["reachable_idx"]
distance = request.content["distance"]
steps_remaining = request.content["steps_remaining"]
return self.update(
preference_point,
speed,
go_to_previous,
stop,
step_number,
nav_point,
lower_bounds,
upper_bounds,
reachable_idx,
distance,
steps_remaining,
)
else:
return self.update(preference_point, speed, go_to_previous, stop,)
[docs] def update(
self,
ref_point: np.ndarray,
speed: int,
go_to_previous: bool,
stop: bool,
step_number: Optional[int] = None,
nav_point: Optional[np.ndarray] = None,
lower_bounds: Optional[np.ndarray] = None,
upper_bounds: Optional[np.ndarray] = None,
reachable_idx: Optional[List[int]] = None,
distance: Optional[float] = None,
steps_remaining: Optional[int] = None,
) -> Optional[NautilusNavigatorRequest]:
"""Update the inernal state of self.
Args:
ref_point (np.ndarray): A reference point given by a decision maker.
speed (int): An integer value between 1-5 indicating the navigation speed.
go_to_previous (bool): If True, the parameters indicate the state
of a previous state, and the request is handled accordingly.
stop (bool): If the navigation should stop. If True, self.update return None.
step_number (Optional[int], optional): Current step number, or
previous step number if go_to_previous is True. Defaults to None.
nav_point (Optional[np.ndarray], optional): The current
navigation point. Relevant if go_to_previous is True. Defaults to
None.
lower_bounds (Optional[np.ndarray], optional): Lower bounds of
the reachable objective vector valus. Relevant if go_to_previous
is True. Defaults to None.
upper_bounds (Optional[np.ndarray], optional): Upper bounds of
the reachable objective vector valus. Relevant if go_to_previous
is True. Defaults to None.
reachable_idx (Optional[List[int]], optional): Indices of the
reachable Pareto optimal solutions. Relevant if go_to_previous is
True. Defaults to None.
distance (Optional[float], optional): Distance to the Pareto
optimal front. Relevant if go_to_previous is True. Defaults to
None.
steps_remaining (Optional[int], optional): Remaining steps in the
navigation. Relevant if go_to_previous is True. Defaults to None.
Returns:
NautilusNavigatorRequest: Some of the given parameters are erraneous.
"""
# go to a previous state
if go_to_previous:
self._step_number = step_number
self._navigation_point = nav_point
self._reachable_lb = lower_bounds
self._reachable_ub = upper_bounds
self._reachable_idx = reachable_idx
self._distance = distance
self._steps_remaining = steps_remaining
return NautilusNavigatorRequest.init_with_method(self)
# compute a new navigation point closer to the Pareto front and the
# bounds of the reachable Pareto optimal region.
elif self._step_number == 1 or not np.allclose(ref_point, self._reference_point):
if self._step_number == 1:
self._current_speed = speed
proj_i = self.solve_nautilus_asf_problem(
self._pareto_front, self._reachable_idx, ref_point, self._ideal, self._nadir,
)
self._reference_point = ref_point
self._projection_index = proj_i
elif stop:
# new reference point given, also update speed
self._reference_point = ref_point
self._current_speed = speed
new_nav = self.calculate_navigation_point(
self._pareto_front[self._projection_index], self._navigation_point, self._steps_remaining,
)
self._navigation_point = new_nav
new_lb, new_ub = self.calculate_bounds(self._pareto_front[self._reachable_idx], self._navigation_point,)
self._reachable_lb = new_lb
self._reachable_ub = new_ub
new_dist = self.calculate_distance(
self._navigation_point, self._pareto_front[self._projection_index], self._nadir,
)
self._distance = new_dist
new_reachable = self.calculate_reachable_point_indices(
self._pareto_front, self._reachable_lb, self._reachable_ub,
)
self._reachable_idx = new_reachable
# If stop, do not update steps
if self._steps_remaining == 1:
# stop
return NautilusNavigatorRequest.init_with_method(self)
self._step_number += 1
self._steps_remaining -= 1
return NautilusNavigatorRequest.init_with_method(self)
[docs] def calculate_reachable_point_indices(
self, pareto_front: np.ndarray, lower_bounds: np.ndarray, upper_bounds: np.ndarray,
) -> List[int]:
"""Calculate the indices of the reachable Pareto optimal solutions
based on lower and upper bounds.
Returns:
List[int]: List of the indices of the reachable solutions.
"""
low_idx = np.all(pareto_front >= lower_bounds, axis=1)
up_idx = np.all(pareto_front <= upper_bounds, axis=1)
reachable_idx = np.argwhere(low_idx & up_idx).squeeze()
return reachable_idx
[docs] def solve_nautilus_asf_problem(
self, pareto_f: np.ndarray, subset_indices: [int], ref_point: np.ndarray, ideal: np.ndarray, nadir: np.ndarray,
) -> int:
"""Forms and solves the achievement scalarizing function to find the
closesto point on the Pareto optimal front to the given reference
point.
Args:
pareto_f (np.ndarray): The whole Pareto optimal front.
subset_indices ([type]): Indices of the currently reachable solutions.
ref_point (np.ndarray): The reference point indicating a decision
maker's preference.
ideal (np.ndarray): Ideal point.
nadir (np.ndarray): Nadir point.
Returns:
int: Index of the closest point according the minimized value of the ASF.
"""
asf = PointMethodASF(nadir, ideal)
scalarizer = DiscreteScalarizer(asf, {"reference_point": ref_point})
solver = DiscreteMinimizer(scalarizer)
tmp = np.copy(pareto_f)
mask = np.zeros(tmp.shape[0], dtype=bool)
mask[subset_indices] = True
tmp[~mask] = np.nan
res = solver.minimize(tmp)
return res
[docs] def calculate_navigation_point(
self, projection: np.ndarray, nav_point: np.ndarray, steps_remaining: int,
) -> np.ndarray:
"""Calculate a new navigation point based on the projection of the
preference point to the Pareto optimal front.
Args:
projection (np.ndarray): The point on the Pareto optimal front
closest to the preference point given by a decision maker.
nav_point (np.ndarray): The previous navigation point.
steps_remaining (int): How many steps are remaining in the navigation.
Returns:
np.ndarray: The new navigation point.
"""
new_nav_point = ((steps_remaining - 1) / steps_remaining) * nav_point + (1 / steps_remaining) * projection
return new_nav_point
[docs] def calculate_bounds(self, pareto_front: np.ndarray, nav_point: np.ndarray,) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate the new bounds of the reachable points on the Pareto
optimal front from a navigation point.
Args:
pareto_front (np.ndarray): The Pareto optimal front.
nav_point (np.ndarray): The current navigation point.
Returns:
Tuple[np.ndarray, np.ndarray]: The lower and upper bounds.
"""
_pareto_front = np.atleast_2d(pareto_front)
new_lower_bounds = np.zeros(_pareto_front.shape[1])
new_upper_bounds = np.zeros(_pareto_front.shape[1])
# TODO: vectorize this loop
for r in range(_pareto_front.shape[1]):
mask = np.zeros(_pareto_front.shape[1], dtype=bool)
mask[r] = True
subject_to = _pareto_front[:, ~mask].reshape((_pareto_front.shape[0], _pareto_front.shape[1] - 1))
con_mask = np.all(subject_to <= nav_point[~mask], axis=1)
if _pareto_front[con_mask, mask].size != 0:
min_val = np.min(_pareto_front[con_mask, mask])
max_val = np.max(_pareto_front[con_mask, mask])
else:
min_val = self._reachable_lb[r]
max_val = self._reachable_ub[r]
new_lower_bounds[r] = min_val
new_upper_bounds[r] = max_val
return new_lower_bounds, new_upper_bounds
[docs] def calculate_distance(self, nav_point: np.ndarray, projection: np.ndarray, nadir: np.ndarray) -> float:
"""Calculate the distance to the Pareto optimal front from a
navigation point. The distance is calculated to the supplied
projection which is assumed to lay on the front.
Args:
nav_point (np.ndarray): The navigation point.
projection (np.ndarray): The point of the Pareto optimal front the distance is calculated to.
nadir (np.ndarray): The nadir point of the Pareto optimal set.
Returns:
float: The distance.
"""
nom = np.linalg.norm(nav_point - nadir)
denom = np.linalg.norm(projection - nadir)
dist = (nom / denom) * 100
return dist
if __name__ == "__main__":
# front = np.array([[1, 2, 3], [2, 3, 4], [2, 2, 3], [3, 2, 1]], dtype=float)
# ideal = np.zeros(3)
# nadir = np.ones(3) * 5
f1 = np.linspace(1, 100, 50)
f2 = f1[::-1] ** 2
front = np.stack((f1, f2)).T
ideal = np.min(front, axis=0)
nadir = np.max(front, axis=0)
method = NautilusNavigator((front), ideal, nadir)
req = method.start()
print(req.content["reachable_lb"])
print(req.content["navigation_point"])
print(req.content["reachable_ub"])
response = {
"reference_point": np.array([50, 6000]),
"speed": 5,
"go_to_previous": False,
"stop": False,
}
req.response = response
req = method.iterate(req)
req.response = response
req1 = req
import time
while req.content["steps_remaining"] > 1:
time.sleep(1 / req.content["current_speed"])
req = method.iterate(req)
req.response = response
print(req.content["steps_remaining"])
print(req.content["reachable_lb"])
print(req.content["navigation_point"])
print(req.content["reachable_ub"])
req1.response["go_to_previous"] = True
req = method.iterate(req1)
req.response = response
req.response["go_to_previous"] = False
while req.content["steps_remaining"] > 1:
time.sleep(1 / req.content["current_speed"])
req = method.iterate(req)
req.response = response
print(req.content["steps_remaining"])
print(req.content["reachable_lb"])
print(req.content["navigation_point"])
print(req.content["reachable_ub"])
print(req)