from math import log, floor
import random
import warnings
import numpy as np
__all__ = ["pareto_efficient", "hypervolume"]
[docs]def pareto_efficient(data, maximize=True):
"""
Find the pareto-efficient points
Parameters
---------
data: array-like
An (n_points, n_data) array
maximize: bool, optional
Whether the problem is a maximization or minimization problem.
Defaults to maximization (i.e,. True)
Returns
-------
data, indices:
data is an array with the pareto front values
indices is an array with the indices of the pareto points in the original data array
"""
original_data = data
indices = np.arange(data.shape[0])
n_points = data.shape[0]
next_point_index = 0 # Next index in the indices array to search for
while next_point_index < len(data):
if maximize:
nondominated_point_mask = np.any(data > data[next_point_index], axis=1)
else:
nondominated_point_mask = np.any(data < data[next_point_index], axis=1)
nondominated_point_mask[next_point_index] = True
indices = indices[nondominated_point_mask] # Remove dominated points
data = data[nondominated_point_mask]
next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1
return data, indices
[docs]def hypervolume(pointset, ref):
"""Compute the absolute hypervolume of a *pointset* according to the
reference point *ref*.
"""
ref = np.array(ref)
# Remove points above reference
for i in range(pointset.shape[1]):
indices = np.where(pointset[:, i] < ref[i])[0]
pointset = pointset[indices, :]
if len(pointset) == 0:
hv = 0
else:
hyper = _HyperVolume(ref)
hv = hyper.compute(pointset)
return hv
class _HyperVolume:
"""
This code is copied from the GA library DEAP.
Hypervolume computation based on variant 3 of the algorithm in the paper:
C. M. Fonseca, L. Paquete, and M. Lopez-Ibanez. An improved dimension-sweep
algorithm for the hypervolume indicator. In IEEE Congress on Evolutionary
Computation, pages 1157-1163, Vancouver, Canada, July 2006.
Minimization is implicitly assumed here!
"""
def __init__(self, referencePoint):
"""Constructor."""
self.referencePoint = referencePoint
self.list = []
def compute(self, front):
"""Returns the hypervolume that is dominated by a non-dominated front.
Before the HV computation, front and reference point are translated, so
that the reference point is [0, ..., 0].
"""
def weaklyDominates(point, other):
for i in range(len(point)):
if point[i] > other[i]:
return False
return True
relevantPoints = []
referencePoint = self.referencePoint
dimensions = len(referencePoint)
#######
# fmder: Here it is assumed that every point dominates the reference point
# for point in front:
# # only consider points that dominate the reference point
# if weaklyDominates(point, referencePoint):
# relevantPoints.append(point)
relevantPoints = front
# fmder
#######
if any(referencePoint):
# shift points so that referencePoint == [0, ..., 0]
# this way the reference point doesn't have to be explicitly used
# in the HV computation
#######
# fmder: Assume relevantPoints are numpy array
# for j in range(len(relevantPoints)):
# relevantPoints[j] = [relevantPoints[j][i] - referencePoint[i] for i in range(dimensions)]
relevantPoints -= referencePoint
# fmder
#######
self.preProcess(relevantPoints)
bounds = [-1.0e308] * dimensions
hyperVolume = self.hvRecursive(dimensions - 1, len(relevantPoints), bounds)
return hyperVolume
def hvRecursive(self, dimIndex, length, bounds):
"""Recursive call to hypervolume calculation.
In contrast to the paper, the code assumes that the reference point
is [0, ..., 0]. This allows the avoidance of a few operations.
"""
hvol = 0.0
sentinel = self.list.sentinel
if length == 0:
return hvol
elif dimIndex == 0:
# special case: only one dimension
# why using hypervolume at all?
return -sentinel.next[0].cargo[0]
elif dimIndex == 1:
# special case: two dimensions, end recursion
q = sentinel.next[1]
h = q.cargo[0]
p = q.next[1]
while p is not sentinel:
pCargo = p.cargo
hvol += h * (q.cargo[1] - pCargo[1])
if pCargo[0] < h:
h = pCargo[0]
q = p
p = q.next[1]
hvol += h * q.cargo[1]
return hvol
else:
remove = self.list.remove
reinsert = self.list.reinsert
hvRecursive = self.hvRecursive
p = sentinel
q = p.prev[dimIndex]
while q.cargo is not None:
if q.ignore < dimIndex:
q.ignore = 0
q = q.prev[dimIndex]
q = p.prev[dimIndex]
while length > 1 and (
q.cargo[dimIndex] > bounds[dimIndex]
or q.prev[dimIndex].cargo[dimIndex] >= bounds[dimIndex]
):
p = q
remove(p, dimIndex, bounds)
q = p.prev[dimIndex]
length -= 1
qArea = q.area
qCargo = q.cargo
qPrevDimIndex = q.prev[dimIndex]
if length > 1:
hvol = qPrevDimIndex.volume[dimIndex] + qPrevDimIndex.area[dimIndex] * (
qCargo[dimIndex] - qPrevDimIndex.cargo[dimIndex]
)
else:
qArea[0] = 1
qArea[1 : dimIndex + 1] = [
qArea[i] * -qCargo[i] for i in range(dimIndex)
]
q.volume[dimIndex] = hvol
if q.ignore >= dimIndex:
qArea[dimIndex] = qPrevDimIndex.area[dimIndex]
else:
qArea[dimIndex] = hvRecursive(dimIndex - 1, length, bounds)
if qArea[dimIndex] <= qPrevDimIndex.area[dimIndex]:
q.ignore = dimIndex
while p is not sentinel:
pCargoDimIndex = p.cargo[dimIndex]
hvol += q.area[dimIndex] * (pCargoDimIndex - q.cargo[dimIndex])
bounds[dimIndex] = pCargoDimIndex
reinsert(p, dimIndex, bounds)
length += 1
q = p
p = p.next[dimIndex]
q.volume[dimIndex] = hvol
if q.ignore >= dimIndex:
q.area[dimIndex] = q.prev[dimIndex].area[dimIndex]
else:
q.area[dimIndex] = hvRecursive(dimIndex - 1, length, bounds)
if q.area[dimIndex] <= q.prev[dimIndex].area[dimIndex]:
q.ignore = dimIndex
hvol -= q.area[dimIndex] * q.cargo[dimIndex]
return hvol
def preProcess(self, front):
"""Sets up the list data structure needed for calculation."""
dimensions = len(self.referencePoint)
nodeList = _MultiList(dimensions)
nodes = [_MultiList.Node(dimensions, point) for point in front]
for i in range(dimensions):
self.sortByDimension(nodes, i)
nodeList.extend(nodes, i)
self.list = nodeList
def sortByDimension(self, nodes, i):
"""Sorts the list of nodes by the i-th value of the contained points."""
# build a list of tuples of (point[i], node)
decorated = [(node.cargo[i], node) for node in nodes]
# sort by this value
decorated.sort()
# write back to original list
nodes[:] = [node for (_, node) in decorated]
class _MultiList:
"""A special data structure needed by FonsecaHyperVolume.
It consists of several doubly linked lists that share common nodes. So,
every node has multiple predecessors and successors, one in every list.
"""
class Node:
def __init__(self, numberLists, cargo=None):
self.cargo = cargo
self.next = [None] * numberLists
self.prev = [None] * numberLists
self.ignore = 0
self.area = [0.0] * numberLists
self.volume = [0.0] * numberLists
def __str__(self):
return str(self.cargo)
def __lt__(self, other):
return all(self.cargo < other.cargo)
def __init__(self, numberLists):
"""Constructor.
Builds 'numberLists' doubly linked lists.
"""
self.numberLists = numberLists
self.sentinel = _MultiList.Node(numberLists)
self.sentinel.next = [self.sentinel] * numberLists
self.sentinel.prev = [self.sentinel] * numberLists
def __str__(self):
strings = []
for i in range(self.numberLists):
currentList = []
node = self.sentinel.next[i]
while node != self.sentinel:
currentList.append(str(node))
node = node.next[i]
strings.append(str(currentList))
stringRepr = ""
for string in strings:
stringRepr += string + "\n"
return stringRepr
def __len__(self):
"""Returns the number of lists that are included in this _MultiList."""
return self.numberLists
def getLength(self, i):
"""Returns the length of the i-th list."""
length = 0
sentinel = self.sentinel
node = sentinel.next[i]
while node != sentinel:
length += 1
node = node.next[i]
return length
def append(self, node, index):
"""Appends a node to the end of the list at the given index."""
lastButOne = self.sentinel.prev[index]
node.next[index] = self.sentinel
node.prev[index] = lastButOne
# set the last element as the new one
self.sentinel.prev[index] = node
lastButOne.next[index] = node
def extend(self, nodes, index):
"""Extends the list at the given index with the nodes."""
sentinel = self.sentinel
for node in nodes:
lastButOne = sentinel.prev[index]
node.next[index] = sentinel
node.prev[index] = lastButOne
# set the last element as the new one
sentinel.prev[index] = node
lastButOne.next[index] = node
def remove(self, node, index, bounds):
"""Removes and returns 'node' from all lists in [0, 'index'[."""
for i in range(index):
predecessor = node.prev[i]
successor = node.next[i]
predecessor.next[i] = successor
successor.prev[i] = predecessor
if bounds[i] > node.cargo[i]:
bounds[i] = node.cargo[i]
return node
def reinsert(self, node, index, bounds):
"""
Inserts 'node' at the position it had in all lists in [0, 'index'[
before it was removed. This method assumes that the next and previous
nodes of the node that is reinserted are in the list.
"""
for i in range(index):
node.prev[i].next[i] = node
node.next[i].prev[i] = node
if bounds[i] > node.cargo[i]:
bounds[i] = node.cargo[i]