Source code for summit.strategies.base
import warnings
from summit.domain import *
from summit.utils.dataset import DataSet
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod, abstractclassmethod
from typing import List, Type, Tuple
import json
__all__ = [
"Transform",
"Strategy",
"Design",
"MultitoSingleObjective",
"LogSpaceObjectives",
"Chimera",
]
[docs]class Transform:
"""Pre/post-processing of data for strategies
Parameters
----------
domain: :class:`~summit.domain.Domain`
A domain for that is being used in the strategy
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
Notes
------
This class can be overridden to create custom transformations as necessary.
"""
def __init__(self, domain, **kwargs):
self.transform_domain = domain.copy()
self.domain = domain
def transform_inputs_outputs(self, ds: DataSet, **kwargs):
"""Transform of data into inputs and outptus for a strategy
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
min_max_scale_inputs: bool, optional
Scale continuous inputs to their bounds. In the case of descriptors,
scale to the minimum and maximum of each column in the descriptor set.
min_max_scale_outputs: bool, optional
Scale continuous ouputs to their bounds.
standardize_inputs : bool, optional
Standardize all input continuous variables. Default is False.
standardize_outputs : bool, optional
Standardize all output continuous variables. Default is False.
categorical_method : str, optional
The method for transforming categorical variables. Either None,
"one-hot" or "descriptors". Descriptors must be included in the
categorical variables for the later. Defaults to one-hot.
Returns
-------
inputs, outputs
Datasets with the input and output datasets
"""
from sklearn.preprocessing import OneHotEncoder
copy = kwargs.get("copy", True)
categorical_method = kwargs.get("categorical_method")
min_max_scale_inputs = kwargs.get("min_max_scale_inputs", False)
min_max_scale_outputs = kwargs.get("min_max_scale_outputs", False)
standardize_inputs = kwargs.get("standardize_inputs", False)
standardize_outputs = kwargs.get("standardize_outputs", False)
if min_max_scale_inputs and standardize_inputs:
raise ValueError(
"Cannot use MinMax scaling and standard scaling simulataneously on the inputs."
)
if min_max_scale_outputs and standardize_outputs:
raise ValueError(
"Cannot use MinMax scaling and standard scaling simulataneously on the outputs."
)
data_columns = ds.data_columns
new_ds = ds.copy() if copy else ds
# Determine input and output columns in dataset
input_columns = []
output_columns = []
self.input_means, self.input_stds = {}, {}
self.output_means, self.output_stds = {}, {}
self.encoders = {}
for variable in self.domain.input_variables:
if (
isinstance(variable, CategoricalVariable)
and (categorical_method == "descriptors" or (categorical_method == "mixed" and variable.ds is not None))
):
# Add descriptors to the dataset
var_descriptor_names = variable.ds.data_columns
new_ds = new_ds.merge(variable.ds, left_on=variable.name, right_index=True, how="left")
# Make the original categorical column a metadata column
original_categorical = new_ds[variable.name].copy()
new_ds = new_ds.drop((variable.name, "DATA"), axis=1)
new_ds[variable.name, "METADATA"] = original_categorical
# Normalize descriptors between 0 and 1
if min_max_scale_inputs:
for descriptor in var_descriptor_names:
var_max = variable.ds[descriptor].max()
var_min = variable.ds[descriptor].min()
new_ds[descriptor, "DATA"] = (new_ds[descriptor] - var_min) / (
var_max - var_min
)
# add descriptors data columns to inputs
input_columns.extend(var_descriptor_names)
elif (
isinstance(variable, CategoricalVariable)
and (categorical_method == "one-hot" or categorical_method == "mixed" and variable.ds is None)
):
# Create one-hot encoding columns & insert to DataSet
enc = OneHotEncoder(categories=[variable.levels])
values = np.atleast_2d(new_ds[variable.name].to_numpy()).T
one_hot_values = enc.fit_transform(values).toarray()
for loc, l in enumerate(variable.levels):
column_name = f"{variable.name}_{l}"
new_ds[column_name, "DATA"] = one_hot_values[:, loc]
input_columns.append(column_name)
self.encoders[variable.name] = enc
# Drop old categorical column, then write as metadata
new_ds = new_ds.drop(variable.name, axis=1, level=0)
new_ds[variable.name, "METADATA"] = values
elif (
isinstance(variable, CategoricalVariable) and categorical_method == None
):
input_columns.append(variable.name)
elif isinstance(variable, ContinuousVariable):
if standardize_inputs:
values, mean, std = self.standardize_column(
new_ds[variable.name].astype(float)
)
self.input_means[variable.name] = mean
self.input_stds[variable.name] = std
new_ds[variable.name, "DATA"] = values
elif min_max_scale_inputs:
var_min, var_max = variable.bounds[0], variable.bounds[1]
new_ds[variable.name, "DATA"] = (
(new_ds[variable.name] - var_min)
) / (var_max - var_min)
input_columns.append(variable.name)
else:
raise DomainError(
f"Variable {variable.name} is not a continuous or categorical variable."
)
for variable in self.domain.output_variables:
if variable.name in data_columns and variable.is_objective:
if isinstance(variable, CategoricalVariable):
raise DomainError(
"Output variables cannot be categorical variables currently."
)
if standardize_outputs:
values, mean, std = self.standardize_column(
new_ds[variable.name].astype(float)
)
self.output_means[variable.name] = mean
self.output_stds[variable.name] = std
new_ds[variable.name, "DATA"] = values
elif min_max_scale_outputs:
var_min, var_max = variable.bounds[0], variable.bounds[1]
new_ds[variable.name, "DATA"] = (
new_ds[variable.name] - var_min
) / (var_max - var_min)
output_columns.append(variable.name)
# Ensure continuous variables are floats
new_ds[variable.name] = new_ds[variable.name].astype(float)
else:
raise DomainError(f"Variable {variable.name} is not in the dataset.")
if output_columns is None:
raise DomainError(
"No output columns in the domain. Add at least one output column for optimisation."
)
# Return the inputs and outputs as separate datasets
return new_ds[input_columns].copy(), new_ds[output_columns].copy()
def un_transform(self, ds, **kwargs):
"""Transform data back into its original represetnation
after strategy is finished
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
standardize_inputs : bool, optional
Standardize all input continuous variables. Default is False.
standardize_outputs : bool, optional
Standardize all output continuous variables. Default is False.
categorical_method : str or None, optional
The method for transforming categorical variables. Either
"one-hot", "descriptors" or None. Descriptors must be included in the
categorical variables for the later. Default is None.
Notes
-----
Override this class to achieve custom untransformations
"""
from sklearn.preprocessing import OneHotEncoder
categorical_method = kwargs.get("categorical_method")
min_max_scale_inputs = kwargs.get("min_max_scale_inputs", False)
min_max_scale_outputs = kwargs.get("min_max_scale_outputs", False)
standardize_inputs = kwargs.get("standardize_inputs", False)
standardize_outputs = kwargs.get("standardize_outputs", False)
if min_max_scale_inputs and standardize_inputs:
raise ValueError(
"Cannot use MinMax scaling and standard scaling simulataneously on the inputs."
)
if min_max_scale_outputs and standardize_outputs:
raise ValueError(
"Cannot use MinMax scaling and standard scaling simulataneously on the outputs."
)
data_columns = ds.data_columns
# Determine input and output columns in dataset
new_ds = ds.copy()
for i, variable in enumerate(self.domain.input_variables):
# Categorical variables with descriptors
if (
isinstance(variable, CategoricalVariable)
and ((categorical_method == "descriptors") or (categorical_method == "mixed" and variable.ds is not None))
):
var_descriptor_names = variable.ds.data_columns
# Unnormalize descriptors between 0 and 1
if min_max_scale_inputs:
for descriptor in var_descriptor_names:
var_max = variable.ds[descriptor].max()
var_min = variable.ds[descriptor].min()
new_ds[descriptor, "DATA"] = (
new_ds[descriptor] * (var_max - var_min) + var_min
)
var_descriptor_conditions = new_ds[var_descriptor_names]
var_descriptor_orig_data = np.asarray(
[
variable.ds.loc[[level], :].values[0].tolist()
for level in variable.ds.index
]
)
var_categorical_transformed = []
# Find the closest points by euclidean distance
for _, dc in var_descriptor_conditions.iterrows():
# Euclidean distance calculation
eucl_distance_squ = np.sum(
np.square(
np.subtract(
var_descriptor_orig_data,
dc.to_numpy(),
)
),
axis=1,
)
# Choose closest point
cat_level_index = np.where(
eucl_distance_squ == np.min(eucl_distance_squ)
)[0][0]
# Find the matching name of the categorical variable
cat_level = variable.ds.index[cat_level_index]
# Add the original categorical variables name to the dataset
var_categorical_transformed.append(cat_level)
new_ds.insert(
loc=i, column=variable.name, value=var_categorical_transformed
)
# Make the descriptors columns a metadata column
for var in var_descriptor_names:
descriptor = new_ds[var].copy()
new_ds = new_ds.drop(columns=var, level=0)
new_ds[var, "METADATA"] = descriptor
# Categorical variables using one-hot encoding
elif (
isinstance(variable, CategoricalVariable)
and ((categorical_method == "one-hot") or (categorical_method == "mixed" and variable.ds is None))
):
# Get one-hot encoder
enc = self.encoders[variable.name]
# Get array to be transformed
one_hot_names = [f"{variable.name}_{l}" for l in variable.levels]
one_hot = new_ds[one_hot_names].to_numpy()
# Do inverse transform
values = enc.inverse_transform(one_hot)
# Add to dataset and drop one-hot encoding
new_ds = new_ds.drop(one_hot_names, axis=1, level=0)
new_ds[variable.name, "DATA"] = values
# Plain categorical variables
elif isinstance(variable, CategoricalVariable):
pass
# Continuous variables
elif isinstance(variable, ContinuousVariable):
if standardize_inputs:
mean = self.input_means[variable.name]
std = self.input_stds[variable.name]
values = new_ds[variable.name]
new_ds[variable.name, "DATA"] = values * std + mean
elif min_max_scale_inputs:
var_min, var_max = variable.bounds[0], variable.bounds[1]
new_ds[variable.name, "DATA"] = (
new_ds[variable.name] * (var_max - var_min) + var_min
)
new_ds[variable.name, "DATA"] = new_ds[variable.name].astype(float)
else:
raise DomainError(f"Variable {variable.name} is not in the dataset.")
for variable in self.domain.output_variables:
if variable.name in data_columns and variable.is_objective:
if standardize_outputs:
mean = self.output_means[variable.name]
std = self.output_stds[variable.name]
values = new_ds[variable.name]
new_ds[variable.name, "DATA"] = values * std + mean
elif min_max_scale_inputs:
var_min, var_max = variable.bounds[0], variable.bounds[1]
new_ds[variable.name, "DATA"] = (
new_ds[variable.name] * (var_max - var_min) + var_min
)
return new_ds
def to_dict(self, **kwargs):
"""Output a dictionary representation of the transform"""
return dict(
transform_domain=self.transform_domain.to_dict(),
name=self.__class__.__name__,
domain=self.domain.to_dict(),
transform_params=kwargs,
)
@classmethod
def from_dict(cls, d):
t = cls(Domain.from_dict(d["domain"]), **d["transform_params"])
t.transform_domain = Domain.from_dict(d["transform_domain"])
return t
@staticmethod
def standardize_column(X):
X = X.to_numpy()
mean, std = X.mean(), X.std()
std = std if std > 1e-5 else 1e-5
scaled = (X - mean) / std
return scaled, mean, std
def set_column_types(ds, column_names, column_type):
type_map = {0: "DATA", 1: "METADATA"}
def map_index_level(index, mapper, level=0):
"""
Returns a new Index or MultiIndex, with the level values being mapped.
"""
assert isinstance(index, pd.Index)
if isinstance(index, pd.MultiIndex):
new_level = index.levels[level].map(mapper)
new_index = index.set_levels(new_level, level=level)
else:
# Single level index.
assert level == 0
new_index = index.map(mapper)
return new_index
def transform_from_dict(d):
if d["name"] == "MultitoSingleObjective":
return MultitoSingleObjective.from_dict(d)
elif d["name"] == "LogSpaceObjectives":
return LogSpaceObjectives.from_dict(d)
elif d["name"] == "Chimera":
return Chimera.from_dict(d)
elif d["name"] == "Transform":
return Transform.from_dict(d)
[docs]class MultitoSingleObjective(Transform):
"""Transform a multiobjective problem into a single objective problems
Parameters
----------
domain: :class:`~summit.domain.Domain`
A domain for that is being used in the strategy
expression: str
An expression in terms of variable names used to
convert the multiobjective problem into a single
objective problem
Returns
-------
result: `bool`
description
Raises
------
ValueError
If domain does not have at least two objectives
Examples
----------
>>> from summit.domain import *
>>> from summit.strategies import SNOBFIT, MultitoSingleObjective
>>> from summit.utils.dataset import DataSet
>>> # Create domain
>>> domain = Domain()
>>> domain += ContinuousVariable(name="temperature",description="reaction temperature in celsius", bounds=[50, 100])
>>> domain += ContinuousVariable(name="flowrate_a", description="flow of reactant a in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="flowrate_b", description="flow of reactant b in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="yield_", description="", bounds=[0, 100], is_objective=True, maximize=True)
>>> domain += ContinuousVariable(name="de",description="diastereomeric excess",bounds=[0, 100],is_objective=True,maximize=True)
>>> # Previous reactions
>>> columns = [v.name for v in domain.variables]
>>> values = {("temperature", "DATA"): 60,("flowrate_a", "DATA"): 0.5,("flowrate_b", "DATA"): 0.5,("yield_", "DATA"): 50,("de", "DATA"): 90}
>>> previous_results = DataSet([values], columns=columns)
>>> # Multiobjective transform
>>> transform = MultitoSingleObjective(domain, expression="(yield_+de)/2", maximize=True)
>>> strategy = SNOBFIT(domain, transform=transform)
>>> next_experiments = strategy.suggest_experiments(5, previous_results)
"""
def __init__(self, domain: Domain, expression: str, maximize=True):
super().__init__(domain)
objectives = [v for v in self.transform_domain.variables if v.is_objective]
num_objectives = len(objectives)
if num_objectives <= 1:
raise ValueError(
f"Domain must have at least two objectives; it currently has {num_objectives} objectives."
)
self.expression = expression
# Replace objectives in transform domain
# TODO: maybe there should be an option to define the bounds (important for DRO)
for v in objectives:
i = self.transform_domain.variables.index(v)
self.transform_domain.variables.pop(i)
self.transform_domain += ContinuousVariable(
"scalar_objective",
description=expression,
bounds=[0, 1],
is_objective=True,
maximize=maximize,
)
self.maximize = maximize
[docs] def transform_inputs_outputs(self, ds, **kwargs):
"""Transform of data into inputs and outputs for a strategy
This will do multi to single objective transform
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
Returns
-------
inputs, outputs
Datasets with the input and output datasets
"""
inputs, outputs = super().transform_inputs_outputs(ds, **kwargs)
outputs = outputs.eval(self.expression, resolvers=[outputs])
outputs = DataSet(outputs, columns=["scalar_objective"])
return inputs, outputs
[docs] def to_dict(self):
"""Output a dictionary representation of the transform"""
transform_params = dict(expression=self.expression, maximize=self.maximize)
d = super().to_dict(**transform_params)
return d
[docs]class LogSpaceObjectives(Transform):
"""Log transform objectives
Parameters
----------
domain: :class:`~summit.domain.Domain`
A domain for that is being used in the strategy
Raises
------
ValueError
When the domain has no objectives.
Examples
----------
>>> from summit.domain import *
>>> from summit.strategies import SNOBFIT, MultitoSingleObjective
>>> from summit.utils.dataset import DataSet
>>> # Create domain
>>> domain = Domain()
>>> domain += ContinuousVariable(name="temperature",description="reaction temperature in celsius", bounds=[50, 100])
>>> domain += ContinuousVariable(name="flowrate_a", description="flow of reactant a in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="flowrate_b", description="flow of reactant b in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="yield_", description="", bounds=[0, 100], is_objective=True, maximize=True)
>>> domain += ContinuousVariable(name="de",description="diastereomeric excess",bounds=[0, 100],is_objective=True,maximize=True)
>>> # Previous reactions
>>> columns = [v.name for v in domain.variables]
>>> values = {("temperature", "DATA"): 60,("flowrate_a", "DATA"): 0.5,("flowrate_b", "DATA"): 0.5,("yield_", "DATA"): 50,("de", "DATA"): 90}
>>> previous_results = DataSet([values], columns=columns)
>>> # Multiobjective transform
>>> transform = LogSpaceObjectives(domain)
>>> strategy = SNOBFIT(domain, transform=transform)
>>> next_experiments = strategy.suggest_experiments(5, previous_results)
"""
def __init__(self, domain: Domain):
super().__init__(domain)
warnings.warn(
"This class will be deprecated in a future version of summit.",
DeprecationWarning,
stacklevel=2,
)
self.to_transform = [v.name for v in self.transform_domain.output_variables]
# Rename variables and set new bounds in transformeed domain
for name in self.to_transform:
v = self.transform_domain[name]
if v is None:
raise ValueError(f"Variable {name} not found in domain.")
v.name = "log_" + v.name
v._lower_bound = np.log(v.bounds[0])
v._upper_bound = np.log(v.bounds[1])
[docs] def transform_inputs_outputs(self, ds, **kwargs):
"""Transform of data into inputs and outptus for a strategy
This will do a log transform on the objectives (outputs).
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
Returns
-------
inputs, outputs
Datasets with the input and output datasets
"""
for name in self.to_transform:
ds.loc[:, ("log_" + name, "DATA")] = np.log(ds[name].astype(float).values)
return super().transform_inputs_outputs(ds, **kwargs)
[docs] def un_transform(self, ds: DataSet, **kwargs):
"""Untransform objectives from log space
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
"""
for name in self.to_transform:
if not "log_" + name in ds.data_columns:
continue
ds.loc[:, (name, "DATA")] = np.exp(ds["log_" + name].astype(float).values)
ds = super().un_transform(ds, **kwargs)
return ds
class LogTransform(Transform):
def __init__(self, domain: Domain, to_transform: List[str]):
super().__init__(domain)
self.to_transform = to_transform
# Rename variables and set new bounds in transformeed domain
for name in self.to_transform:
v = self.transform_domain[name]
if v is None:
raise ValueError(f"Variable {name} not found in domain.")
v.name = "log_" + v.name
v._lower_bound = np.log(v.bounds[0])
v._upper_bound = np.log(v.bounds[1])
# v.bounds = (np.log(v.bounds[0]), np.log(v.bounds[1]))
def transform_inputs_outputs(self, ds, **kwargs):
"""Transform of data into inputs and outptus for a strategy
This will do a log transform on the objectives (outputs).
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
Returns
-------
inputs, outputs
Datasets with the input and output datasets
"""
for name in self.to_transform:
ds.loc[:, ("log_" + name, "DATA")] = np.log(ds[name].astype(float).values)
return super().transform_inputs_outputs(ds, **kwargs)
def un_transform(self, ds: DataSet, **kwargs):
"""Untransform objectives from log space
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
"""
for name in self.to_transform:
if not "log_" + name in ds.data_columns:
continue
ds.loc[:, (name, "DATA")] = np.exp(ds["log_" + name].astype(float).values)
ds = super().un_transform(ds, **kwargs)
return ds
[docs]class Chimera(Transform):
"""Scalarise a multiobjective problem using Chimera.
Chimera is a hiearchical multiobjective scalarasation function.
You set the parameter `loss_tolerances` to weight the importance
of each objective.
Parameters
----------
domain : :class:`~sumit.domain.Domain`
A domain for that is being used in the strategy
hierarchy : dict
Dictionary with keys as the names of the objectives and values as dictionaries
with the keys "hierarchy" and "tolerance" for the ranking and tolerance, respectively, on each objective.
The hierachy is indexed from zero (i.e., 0, 1, 2, etc.) with zero being the highest priority objective.
A smaller tolerance means that the objective will be weighted more, while a
larger tolerance indicates that the objective will be weighted less. The tolerance must
be between zero and one.
softness : float, optional
Smoothing parameter. Defaults to 1e-3 as recommended by Häse et al.
Larger values result in a more smooth objective while smaller values
will give a disjointed objective.
absolutes : array-like, optional
Default is zeros.
Examples
--------
>>> from summit.domain import *
>>> from summit.strategies import SNOBFIT, MultitoSingleObjective
>>> from summit.utils.dataset import DataSet
>>> # Create domain
>>> domain = Domain()
>>> domain += ContinuousVariable(name="temperature",description="reaction temperature in celsius", bounds=[50, 100])
>>> domain += ContinuousVariable(name="flowrate_a", description="flow of reactant a in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="flowrate_b", description="flow of reactant b in mL/min", bounds=[0.1, 0.5])
>>> domain += ContinuousVariable(name="yield_", description="", bounds=[0, 100], is_objective=True, maximize=True)
>>> domain += ContinuousVariable(name="de",description="diastereomeric excess",bounds=[0, 100],is_objective=True,maximize=True)
>>> # Previous reactions
>>> columns = [v.name for v in domain.variables]
>>> values = {("temperature", "DATA"): 60,("flowrate_a", "DATA"): 0.5,("flowrate_b", "DATA"): 0.5,("yield_", "DATA"): 50,("de", "DATA"): 90}
>>> previous_results = DataSet([values], columns=columns)
>>> # Multiobjective transform
>>> hierarchy = {"yield_": {"hierarchy": 0, "tolerance": 0.5}, "de": {"hierarchy": 1, "tolerance": 1.0}}
>>> transform = Chimera(domain, hierarchy=hierarchy)
>>> strategy = SNOBFIT(domain, transform=transform)
>>> next_experiments = strategy.suggest_experiments(5, previous_results)
Notes
------
The original paper on Chimera can be found at [1]_.
This code is based on the code for Griffyn [2]_, which can be found on Github_.
Chimera turns problems into minimisation problems. This is done automatically by reading the type
of objective from the domain.
.. _Github: https://github.com/aspuru-guzik-group/gryffin/blob/d7443bf374e5d1fee2424cb49f5008ce4248d432/src/gryffin/observation_processor/chimera.py
References
----------
.. [1] Häse, F., Roch, L. M., & Aspuru-Guzik, A. "Chimera: enabling hierarchy based multi-objective
optimization for self-driving laboratories." Chemical Science, 2018, 9,7642-7655
.. [2] Häse, F., Roch, L.M. and Aspuru-Guzik, A., 2020. Gryffin: An algorithm for Bayesian
optimization for categorical variables informed by physical intuition with applications to chemistry.
arXiv preprint arXiv:2003.12127.
"""
def __init__(self, domain: Domain, hierarchy: dict, softness=1e-3, absolutes=None):
super().__init__(domain)
# Sort objectives
# {'y_0': {'hiearchy': 0, 'tolerance': 0.2}}
objectives = self.transform_domain.output_variables
self.hierarchy = hierarchy
self.tolerances = np.zeros_like(objectives)
self.directions = np.zeros_like(objectives)
self.ordered_objective_names = len(objectives) * [""]
for name, v in hierarchy.items():
h = v["hierarchy"]
self.ordered_objective_names[h] = name
self.tolerances[h] = v["tolerance"]
self.directions[h] = -1 if self.domain[name].maximize else 1
# Pop objectives from transform domain
for v in objectives:
i = self.transform_domain.variables.index(v)
self.transform_domain.variables.pop(i)
# Add chimera objective to transform domain
self.transform_domain += ContinuousVariable(
"chimera",
"chimeras scalarized objectived",
bounds=[0, 1],
is_objective=True,
maximize=False,
)
# Set chimera parameters
self.absolutes = absolutes
if self.absolutes is None:
self.absolutes = np.zeros(len(self.tolerances)) + np.nan
self.softness = softness
[docs] def transform_inputs_outputs(self, ds, copy=True, **kwargs):
"""Transform of data into inputs and outptus for a strategy
This will do a log transform on the objectives (outputs).
Parameters
----------
ds: `DataSet`
Dataset with columns corresponding to the inputs and objectives of the domain.
copy: bool, optional
Copy the dataset internally. Defaults to True.
transform_descriptors: bool, optional
Transform the descriptors into continuous variables. Default True.
Returns
-------
inputs, outputs
Datasets with the input and output datasets
"""
# Get inputs and outputs
inputs, outputs = super().transform_inputs_outputs(ds, copy=copy, **kwargs)
# Scalarize using Chimera
outputs_arr = outputs[self.ordered_objective_names].to_numpy()
outputs_arr = (
outputs_arr * self.directions
) # Change maximization to minimization
scalarized_array = self._scalarize(outputs_arr)
# Write scalarized objective back to DataSet
outputs = DataSet(scalarized_array, columns=["chimera"])
return inputs, outputs
def _scalarize(self, raw_objs):
res_objs, res_abs = self._rescale(raw_objs)
shifted_objs, abs_tols = self._shift_objectives(res_objs, res_abs)
scalarized_obj = self._scalarize_objs(shifted_objs, abs_tols)
return scalarized_obj
def _scalarize_objs(self, shifted_objs, abs_tols):
scalar_obj = shifted_objs[-1].copy()
for index in range(0, len(shifted_objs) - 1)[::-1]:
scalar_obj *= self._step(-shifted_objs[index] + abs_tols[index])
scalar_obj += (
self._step(shifted_objs[index] - abs_tols[index]) * shifted_objs[index]
)
return scalar_obj.transpose()
def _soft_step(self, value):
arg = -value / self.softness
return 1.0 / (1.0 + np.exp(arg))
def _hard_step(self, value):
result = np.empty(len(value))
result = np.where(value > 0.0, 1.0, 0.0)
return result
def _step(self, value):
if self.softness < 1e-5:
return self._hard_step(value)
else:
return self._soft_step(value)
def _rescale(self, raw_objs):
"""Min-Max scale objectives and absolutes by between 0 and 1"""
res_objs = np.empty(raw_objs.shape)
res_abs = np.empty(self.absolutes.shape)
for index in range(raw_objs.shape[1]):
min_objs, max_objs = (
np.amin(raw_objs[:, index]),
np.amax(raw_objs[:, index]),
)
if min_objs < max_objs:
res_abs[index] = (self.absolutes[index] - min_objs) / (
max_objs - min_objs
)
res_objs[:, index] = (raw_objs[:, index] - min_objs) / (
max_objs - min_objs
)
else:
res_abs[index] = self.absolutes[index] - min_objs
res_objs[:, index] = raw_objs[:, index] - min_objs
return res_objs, res_abs
def _shift_objectives(self, objs, res_abs):
transposed_objs = objs.transpose()
shapes = transposed_objs.shape
shifted_objs = np.empty((shapes[0] + 1, shapes[1]))
mins, maxs, tols = [], [], []
domain = np.arange(shapes[1])
shift = 0
for obj_index, obj in enumerate(transposed_objs):
# get absolute tolerances
minimum = np.amin(obj[domain])
maximum = np.amax(obj[domain])
mins.append(minimum)
maxs.append(maximum)
tolerance = minimum + self.tolerances[obj_index] * (maximum - minimum)
if np.isnan(tolerance):
tolerance = res_abs[obj_index]
# adjust region of interest
interest = np.where(obj[domain] < tolerance)[0]
if len(interest) > 0:
domain = domain[interest]
# apply shift
tols.append(tolerance + shift)
shifted_objs[obj_index] = transposed_objs[obj_index] + shift
# compute new shift
if obj_index < len(transposed_objs) - 1:
shift -= np.amax(transposed_objs[obj_index + 1][domain]) - tolerance
else:
shift -= np.amax(transposed_objs[0][domain]) - tolerance
shifted_objs[obj_index + 1] = transposed_objs[0] + shift
return shifted_objs, tols
[docs] def to_dict(self):
transform_params = dict(
hierarchy=self.hierarchy,
softness=self.softness,
absolutes=self.absolutes.tolist(),
)
return super().to_dict(**transform_params)
@classmethod
def from_dict(cls, d):
absolutes = d["transform_params"]["absolutes"]
d["transform_params"]["absolutes"] = np.array(absolutes)
return super().from_dict(d)
class Strategy(ABC):
"""Base class for strategies
Parameters
----------
domain : `summit.domain.Domain`
A summit domain containing variables and constraints
transform : `summit.strategies.base.Transform`, optional
A transform class (i.e, not the object itself). By default
no transformation will be done the input variables or
objectives.
"""
def __init__(self, domain: Domain, transform: Transform = None, **kwargs):
if transform is None:
self.transform = Transform(domain)
elif isinstance(transform, Transform):
self.transform = transform
else:
raise TypeError("transform must be a Transform class")
self.domain = self.transform.transform_domain
@abstractmethod
def suggest_experiments(self):
raise NotImplementedError(
"Strategies should inhereit this class and impelemnt suggest_experiments"
)
@abstractmethod
def reset(self):
pass
def to_dict(self, **strategy_params):
"""Convert strategy to a dictionary"""
# You can pass in as keyword arguments any custom parameters
# for a strategy, which will be stored under the key strategy_params.
return dict(
name=str(self.__class__.__name__),
transform=self.transform.to_dict(),
strategy_params=strategy_params,
)
@classmethod
def from_dict(cls, d):
"""Create a strategy from a dictionary"""
transform = transform_from_dict(d["transform"])
return cls(domain=transform.domain, transform=transform, **d["strategy_params"])
def save(self, filename):
"""Save a strategy to a JSON file"""
with open(filename, "w") as f:
json.dump(self.to_dict(), f)
@classmethod
def load(cls, filename):
"""Load a strategy from a JSON file"""
with open(filename, "r") as f:
d = json.load(f)
return cls.from_dict(d)
class Design:
"""Representation of an experimental design
Parameters
----------
domain: summit.domain.Domain
The domain of the design
num_samples: int
Number of samples in the design
design_type: str
The name of the design type
Examples
--------
>>> from summit.domain import Domain, ContinuousVariable
>>> domain = Domain()
>>> domain += ContinuousVariable('temperature','reaction temperature', [1, 100])
>>> initial_design = Design(domain, 10, 'example_design')
>>> initial_design.add_variable('temperature', np.array([[100, 120, 150]]))
"""
def __init__(self, domain: Domain, num_samples, design_type: str, exclude=[]):
self._variable_names = [variable.name for variable in domain.variables]
self._indices = domain.num_variables() * [0]
self._values = domain.num_variables() * [0]
self.num_samples = num_samples
self.design_type = design_type
self.exclude = exclude
self._domain = domain
def add_variable(
self, variable_name: str, values: np.ndarray, indices: np.ndarray = None
):
"""Add a variable to a design
Parameters
----------
variable_name: str
Name of the variable to be added. Must already be in the domain.
values: numpy.ndarray
Values of the design points in the variable.
Should be an nxd array, where n is the number of samples and
d is the number of dimensions of the variable.
indices: numpy.ndarray, optional
Indices of the design points in the variable
Raises
------
ValueError
If indices or values are not a two-dimensional array.
"""
variable_index = self._get_variable_index(variable_name)
if values.ndim < 2:
raise ValueError("Values must be 2 dimensional. Use np.atleast_2d.")
if indices is not None:
if indices.ndim < 2:
raise ValueError("Indices must be 2 dimensional. Use np.atleast_2d.")
self._indices[variable_index] = indices
self._values[variable_index] = values
def get_indices(self, variable_name: str) -> np.ndarray:
"""Get indices of designs points
Parameters
----------
variable_name: str, optional
Get only the indices for a specific variable name.
Returns
-------
indices: numpy.ndarray
Indices of the design pionts
Raises
------
ValueError
If the variable name is not in the list of variables
"""
variable_index = self._get_variable_index(variable_name)
indices = self._indices[variable_index]
return indices
def get_values(self, variable_name: str = None) -> np.ndarray:
"""Get values of designs points
Parameters
----------
variable_name: str, optional
Get only the values for a specific variable name.
Returns
-------
values: numpy.ndarray
Values of the design pionts
Raises
------
ValueError
If the variable name is not in the list of variables
"""
if variable_name is not None:
variable_index = self._get_variable_index(variable_name)
values = self._values[variable_index].T
else:
values = np.concatenate(self._values, axis=0).T
return values
def to_dataset(self) -> DataSet:
"""Get design as a pandas dataframe
Returns
-------
ds: summit.utils.dataset.Dataset
"""
df = pd.DataFrame([])
for i, variable in enumerate(self._domain.input_variables):
if isinstance(variable, ContinuousVariable):
values = self.get_values(variable.name)[:, 0]
elif isinstance(variable, CategoricalVariable):
values = [
variable.levels[i] for i in self.get_indices(variable.name)[:, 0]
]
df.insert(i, variable.name, values)
return DataSet.from_df(df)
def _get_variable_index(self, variable_name: str) -> int:
"""Method for getting the internal index for a variable"""
if not variable_name in self._variable_names:
raise ValueError(f"Variable {variable_name} not in domain.")
return self._variable_names.index(variable_name)
# def coverage(self, design_indices, search_matrix=None,
# metric=closest_point_distance):
# ''' Get coverage statistics for a design based
# Arguments:
# design_indices: Indices in the search matrix of the design points
# search_matrix (optional): A matrix of descriptors used for calculating the coverage. By default, the
# descriptor matrix in the instance of solvent select will be used as the search
# matrix
# metric (optional): A function for calculating the coverage. By default this is the closest point.
# The function should take a design point as its first argument and a candidate matrix
# as its second argument.
# Notes:
# Coverage statistics are calculated by finding the distance between each point in the search matrix
# and the closest design point. The statistics are mean, standard deviation, median, maximum, and minimum
# of the distances.
# Returns
# An instance of `DesignCoverage`
# '''
# if search_matrix is None:
# search_matrix = self.descriptor_df.values
# mask = np.ones(search_matrix.shape[0], dtype=bool)
# mask[design_indices] = False
# distances = [metric(row, search_matrix[design_indices, :])
# for row in search_matrix[mask, ...]]
# mean = np.average(distances)
# std_dev = np.std(distances)
# median = np.median(distances)
# max = np.max(distances)
# min = np.min(distances)
# return DesignCoverage(
# mean=mean,
# std_dev=std_dev,
# median=median,
# max = max,
# min = min
# )
def _repr_html_(self):
return self.to_frame().to_html()
class DesignCoverage:
properties = ["mean", "std_dev", "median", "max", "min"]
def __init__(self, mean=None, std_dev=None, median=None, max=None, min=None):
self._mean = mean
self._std_dev = std_dev
self._median = median
self._max = max
self._min = min
@property
def mean(self):
return self._mean
@property
def std_dev(self):
return self._std_dev
@property
def median(self):
return self._median
@property
def max(self):
return self._max
@property
def min(self):
return self._min
def __repr__(self):
values = "".join(
[f"{property}:{getattr(self, property)}, " for property in self.properties]
)
return f"""DesignCoverage({values.rstrip(", ")})"""
def get_dict(self):
return {property: getattr(self, property) for property in self.properties}
def get_array(self):
return [getattr(self, property) for property in self.properties]
@staticmethod
def average_coverages(coverages):
"""Average multiple design coverages
Arguments:
coverages: a list of `DesignCoverage` objects.
"""
# Check that argument is a list of coverages
for coverage in coverages:
assert isinstance(coverage, DesignCoverage)
avg_mean = np.average([coverage.mean for coverage in coverages])
avg_std_dev = np.average([coverage.std_dev for coverage in coverages])
avg_median = np.average([coverage.median for coverage in coverages])
avg_max = np.average([coverage.max for coverage in coverages])
avg_min = np.average([coverage.min for coverage in coverages])
return DesignCoverage(
mean=avg_mean,
std_dev=avg_std_dev,
median=avg_median,
max=avg_max,
min=avg_min,
)
def _closest_point_indices(design_points, candidate_matrix, unique=False):
"""Return the indices of the closest point in the candidate matrix to each design point"""
if unique:
mask = np.ones(candidate_matrix.shape[0], dtype=bool)
indices = [0 for i in range(len(design_points))]
for i, design_point in enumerate(design_points):
masked_candidates = candidate_matrix[mask, :]
point_index = _closest_point_index(design_point, masked_candidates)
actual_index = np.where(
candidate_matrix == masked_candidates[point_index, :]
)[0][0]
indices[i] = actual_index
mask[actual_index] = False
else:
indices = [
_closest_point_index(design_point, candidate_matrix)
for design_point in design_points
]
indices = np.array(indices)
return np.atleast_2d(indices).T
def _closest_point_index(design_point, candidate_matrix):
"""Return the index of the closest point in the candidate matrix"""
distances = _design_distances(design_point, candidate_matrix)
return np.argmin(np.atleast_2d(distances))
def _design_distances(design_point, candidate_matrix):
"""Return the distances between a design_point and all candidates"""
diff = design_point - candidate_matrix
squared = np.power(diff, 2)
summed = np.sum(squared, axis=1)
root_square = np.sqrt(summed)
return root_square