Source code for summit.benchmarks.experimental_emulator

from summit.utils.dataset import DataSet
from summit.domain import *
from summit.experiment import Experiment
from summit import get_summit_config_path
from summit.utils import jsonify_dict, unjsonify_dict

import torch
import torch.nn.functional as F
from skorch import NeuralNetRegressor
from skorch.utils import to_device

from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.model_selection import (
    train_test_split,
    cross_validate,
    GridSearchCV,
    ParameterGrid,
)
from sklearn.model_selection._search import BaseSearchCV, _check_param_grid
from sklearn.base import BaseEstimator, RegressorMixin, is_classifier, clone
from sklearn.model_selection._split import check_cv
from sklearn.model_selection._validation import (
    _fit_and_score,
    _score,
    _aggregate_score_dicts,
)
from sklearn.metrics import r2_score
from sklearn.utils.validation import (
    _deprecate_positional_args,
    indexable,
    check_is_fitted,
    _check_fit_params,
)
from sklearn.utils import check_array, _safe_indexing
from sklearn.utils.fixes import delayed
from sklearn.metrics._scorer import _check_multimetric_scoring

from tqdm.auto import tqdm
from joblib import Parallel
import pathlib
import numpy as np
from numpy.random import default_rng
import pandas as pd
from copy import deepcopy
from itertools import product
from collections import defaultdict
from copy import deepcopy
import pkg_resources
import time
import json
import types
import warnings

__all__ = [
    "ExperimentalEmulator",
    "ANNRegressor",
    "get_bnn",
    "RegressorRegistry",
    "registry",
    "get_pretrained_reizman_suzuki_emulator",
    "get_pretrained_baumgartner_cc_emulator",
    "ReizmanSuzukiEmulator",
    "BaumgartnerCrossCouplingEmulator",
]


[docs]class ExperimentalEmulator(Experiment): """Experimental Emulator Train a machine learning model based on experimental data. The model acts a benchmark for testing optimisation strategies. Parameters ---------- model_name : str Name of the model, ideally with no spaces domain : :class:`~summit.domain.Domain` The domain of the emulator dataset : :class:`~summit.dataset.Dataset`, optional Dataset used for training/validation regressor : :classs:`torch.nn.Module`, optional Pytorch LightningModule class. Defaults to the ANNRegressor output_variable_names : str or list, optional The names of the variables that should be trained by the predictor. Defaults to all objectives in the domain. clip : bool or list Whether to clip predictions to the limits of the objectives in the domain. True (default) means clipping is activated for all outputs and False means it is not activated at all. A list of specific outputs to clip can also be passed. """ def __init__(self, model_name, domain, **kwargs): super().__init__(domain, **kwargs) self.model_name = model_name # Data self.ds = kwargs.get("dataset") if self.ds is not None: self.n_features = self._caclulate_input_dimensions(self.domain) self.n_examples = self.ds.shape[0] self.output_variable_names = kwargs.get( "output_variable_names", [v.name for v in self.domain.output_variables], ) # Create the regressor self.regressor = kwargs.get("regressor", ANNRegressor) self.predictors = kwargs.get("predictors") self.clip = kwargs.get("clip", True) def _run(self, conditions, **kwargs): input_columns = [v.name for v in self.domain.input_variables] X = conditions[input_columns].to_numpy() if X.shape[0] == len(input_columns): X = X[np.newaxis, :] X = pd.DataFrame(X, columns=input_columns) y_pred, y_pred_std = self._predict(X) return_std = kwargs.get("return_std", False) for i, name in enumerate(self.output_variable_names): if type(conditions) == pd.Series: y = y_pred[0, i] y_std = y_pred_std[0, i] else: y = y_pred[:, i] y_std = y_pred_std[:, i] conditions.at[(name, "DATA")] = y if return_std: conditions.at[(f"{name}_std", "METADATA")] = y_std return conditions, {} def _predict(self, X, **kwargs): """Get a prediction Parameters ---------- X : pd.DataFrame A pandas dataframe with inputs to the predictor Returns ------- mean, std Numpy arrays with the average and standard deviation of the ensemble """ y_pred = np.array( [estimator.predict(X, **kwargs) for estimator in self.predictors] ) if self.clip: for i, v in enumerate(self.domain.output_variables): if type(self.clip) == list: if v.name not in self.clip: continue y_pred[:, :, i] = np.clip(y_pred[:, :, i], v.lower_bound, v.upper_bound) return y_pred.mean(axis=0), y_pred.std(axis=0) def train(self, **kwargs): """Train the model on the dataset Parameters --------- test_size : float, optional The size of the test as a fraction of the total dataset. Defaults to 0.1. cv_folds : int, optional The number of cross validation folds. Defaults to 5. max_epochs : int, optional The max number of epochs for each CV fold. Defaults to 100. scoring : str or list, optional A list of scoring functions or names of them. Defaults to R2 and MSE. See here for more https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter regressor_kwargs : dict, optional You can pass extra arguments to the regressor here. callbacks : None, "disable" or list of Callbacks Skorch callbacks passed to skorch.net. See: https://skorch.readthedocs.io/en/latest/net.html verbose : int 0 for no logging, 1 for logging Notes ------ If predictor was set in the initialization, it will not be overwritten. Returns ------- A dictionary containing the results of the training. """ if self.ds is None: raise ValueError("Dataset is required for training.") # Create predictor predictor = self._create_predictor( self.regressor, self.domain, self.n_features, self.n_examples, output_variable_names=self.output_variable_names, **kwargs, ) # Get data input_columns = [v.name for v in self.domain.input_variables] X = self.ds[input_columns].to_numpy() y = self.ds[self.output_variable_names].to_numpy().astype(float) # Sklearn columntransformer expects a pandas dataframe not a dataset X = pd.DataFrame(X, columns=input_columns) # Train-test split test_size = kwargs.get("test_size", 0.1) random_state = kwargs.get("random_state") self.X_train, self.X_test, self.y_train, self.y_test = train_test_split( X, y, test_size=test_size, random_state=random_state ) y_train, y_test = ( torch.tensor(self.y_train).float(), torch.tensor(self.y_test).float(), ) # Training scoring = kwargs.get("scoring", ["r2", "neg_root_mean_squared_error"]) folds = kwargs.get("cv_folds", 5) search_params = kwargs.get("search_params", {}) # Run grid search if requested if search_params: self.logger.info("Starting grid search.") gs = ProgressGridSearchCV( predictor, search_params, refit="r2", cv=folds, scoring=scoring ) gs.fit(self.X_train, y_train) best_params = gs.best_params_ params = {} for param in search_params.keys(): params[param] = best_params[param] predictor.set_params(**params) # Run final training using cross validation initializing = kwargs.get("initializing", False) if not initializing: self.logger.info("Starting training.") res = cross_validate( predictor, self.X_train, y_train, scoring=scoring, cv=folds, return_estimator=True, ) self.predictors = res.pop("estimator") # Rename from test to validation for name in scoring: scores = res.pop(f"test_{name}") res[f"val_{name}"] = scores return res def test(self, **kwargs): scoring = kwargs.get("scoring", ["r2", "neg_root_mean_squared_error"]) scores_list = [] for predictor in self.predictors: if callable(scoring): scorers = scoring elif scoring is None or isinstance(scoring, str): scorers = check_scoring(predictor, scoring) else: scorers = _check_multimetric_scoring(predictor, scoring) scores_list.append(_score(predictor, self.X_test, self.y_test, scorers)) scores_dict = _aggregate_score_dicts(scores_list) for name in scoring: scores = scores_dict.pop(name) scores_dict[f"test_{name}"] = scores return scores_dict @classmethod def _create_predictor( cls, regressor, domain, input_dimensions, num_examples, output_variable_names, **kwargs, ): # Preprocessors output_variable_names = kwargs.get( "output_variable_names", [v.name for v in domain.output_variables] ) X_preprocessor = cls._create_input_preprocessor(domain) y_preprocessor = cls._create_output_preprocessor(output_variable_names) # Create network regressor_kwargs = kwargs.get("regressor_kwargs", {}) regressor_kwargs.update( dict( module__input_dim=input_dimensions, module__output_dim=len(output_variable_names), module__n_examples=num_examples, ) ) verbose = kwargs.get("verbose", 0) net = NeuralNetRegressor( regressor, train_split=None, max_epochs=kwargs.get("max_epochs", 100), callbacks=kwargs.get("callbacks"), verbose=verbose, **regressor_kwargs, ) # Create predictor # TODO: also create an inverse function ds_to_tensor = FunctionTransformer(numpy_to_tensor, check_inverse=False) pipe = Pipeline( steps=[ ("preprocessor", X_preprocessor), ("dst", ds_to_tensor), ("net", net), ] ) # output_pipeline = Pipeline( # steps=[("scaler", StandardScaler()), ("dst", ds_to_tensor)] # ) return UpdatedTransformedTargetRegressor( regressor=pipe, transformer=StandardScaler(), check_inverse=False ) @staticmethod def _caclulate_input_dimensions(domain): num_dimensions = 0 for v in domain.input_variables: if v.variable_type == "continuous": num_dimensions += 1 elif v.variable_type == "categorical": num_dimensions += len(v.levels) return num_dimensions @staticmethod def _create_input_preprocessor(domain): """Create feature preprocessors """ transformers = [] # Numeric transforms numeric_features = [ v.name for v in domain.input_variables if v.variable_type == "continuous" ] if len(numeric_features) > 0: transformers.append(("num", StandardScaler(), numeric_features)) # Categorical transforms categorical_features = [ v.name for v in domain.input_variables if v.variable_type == "categorical" ] categories = [ v.levels for v in domain.input_variables if v.variable_type == "categorical" ] if len(categorical_features) > 0: transformers.append( ("cat", OneHotEncoder(categories=categories), categorical_features) ) # Create preprocessor if len(numeric_features) == 0 and len(categorical_features) > 0: raise DomainError( "With only categorical features, you can do a simple lookup." ) elif len(numeric_features) > 0 or len(categorical_features) > 0: preprocessor = ColumnTransformer(transformers=transformers) else: raise DomainError( "No continuous or categorical features were found in the dataset." ) return preprocessor @staticmethod def _create_output_preprocessor(output_variable_names): """"Create target preprocessors""" transformers = [ ("scale", StandardScaler(), output_variable_names), ("dst", FunctionTransformer(numpy_to_tensor), output_variable_names), ] return ColumnTransformer(transformers=transformers) def to_dict(self, **experiment_params): """Convert emulator parameters to dictionary Notes ------ This does not save the weights and biases of the regressor. You need to use save_regressor method. """ # Predictors predictors = [ self._create_predictor_dict(predictor) for predictor in self.predictors ] # Update experiment_params experiment_params.update( { "model_name": self.model_name, "regressor_name": str(self.regressor.__name__), "n_features": self.n_features, "n_examples": self.n_examples, "output_variable_names": self.output_variable_names, "predictors": predictors, } ) return super().to_dict(**experiment_params) @staticmethod def _create_predictor_dict(predictor): num = predictor.regressor_.named_steps.preprocessor.named_transformers_.num cat = predictor.regressor_.named_steps.preprocessor.named_transformers_.cat input_preprocessor = { # Numerical "num": { "mean_": num.mean_, "var_": num.var_, "scale_": num.scale_, "n_samples_seen_": num.n_samples_seen_, } # Categorical is automatic from the domain } out = predictor.transformer_ output_preprocessor = { "mean_": out.mean_, "var_": out.var_, "scale_": out.scale_, "n_samples_seen_": out.n_samples_seen_, } return jsonify_dict( { "input_preprocessor": input_preprocessor, "output_preprocessor": output_preprocessor, } ) @classmethod def from_dict(cls, d, **kwargs): """Create ExperimentalEmulator from a dictionary Notes ----- This does not load the regressor weights and biases. After calling from_dict, call load_regressor to load the weights and biases. """ params = d["experiment_params"] domain = Domain.from_dict(d["domain"]) # Load regressor regressor = registry[params["regressor_name"]] d["experiment_params"]["regressor"] = regressor # Load predictors predictors_params = params["predictors"] predictors = [ cls._create_predictor( regressor, domain, params["n_features"], params["n_examples"], output_variable_names=params["output_variable_names"], ) for predictor_params in predictors_params ] d["experiment_params"]["predictor"] = predictors # Dataset dataset = kwargs.get("dataset") d["experiment_params"]["dataset"] = dataset # Instantiate the class exp = super().from_dict(d) # Set runtime parameters exp.n_features = params["n_features"] exp.n_examples = params["n_examples"] # One round of training to initialize all variables if exp.ds is None: exp.ds = generate_data(domain, params["n_features"] + 1) exp.train(max_epochs=1, verbose=0, initializing=True) # Set parameters on predictors for predictor, predictor_params in zip(exp.predictors, predictors_params): exp.set_predictor_params(predictor, unjsonify_dict(predictor_params)) return exp @staticmethod def set_predictor_params(predictor, predictor_params): # Input transforms num = predictor.regressor_.named_steps.preprocessor.named_transformers_.num cat = predictor.regressor_.named_steps.preprocessor.named_transformers_.cat input_preprocessor = RecursiveNamespace( **predictor_params["input_preprocessor"] ) num.mean_ = input_preprocessor.num.mean_ num.var_ = input_preprocessor.num.var_ num.scale_ = input_preprocessor.num.scale_ num.n_samples_seen_ = input_preprocessor.num.n_samples_seen_ # Output transforms out = predictor.transformer_ output_preprocessor = RecursiveNamespace( **predictor_params["output_preprocessor"] ) out.mean_ = output_preprocessor.mean_ out.var_ = output_preprocessor.var_ out.scale_ = output_preprocessor.scale_ out.n_samples_seen_ = output_preprocessor.n_samples_seen_ def save_regressor(self, save_dir): """Save the weights and biases of the regressor to disk Parameters ---------- save_dir : str or pathlib.Path The directory used for saving emulator files. """ save_dir = pathlib.Path(save_dir) if self.predictors is None: raise ValueError( "No predictors available. First, run training using the train method." ) for i, predictor in enumerate(self.predictors): predictor.regressor_.named_steps.net.save_params( f_params=save_dir / f"{self.model_name}_predictor_{i}.pt" ) def load_regressor(self, save_dir): """Load the weights and biases of the regressor from disk Parameters ---------- save_dir : str or pathlib.Path The directory used for saving emulator files. """ save_dir = pathlib.Path(save_dir) for i, predictor in enumerate(self.predictors): net = predictor.regressor_.named_steps.net net.initialize() net.load_params(f_params=save_dir / f"{self.model_name}_predictor_{i}.pt") def save(self, save_dir): """Save all the essential parameters of the ExperimentalEmulator to disk Parameters ---------- save_dir : str or pathlib.Path The directory used for saving emulator files. """ save_dir = pathlib.Path(save_dir) if not save_dir.exists(): save_dir.mkdir() with open(save_dir / f"{self.model_name}.json", "w") as f: json.dump(self.to_dict(), f) self.save_regressor(save_dir) @classmethod def load(cls, model_name, save_dir, **kwargs): """Load all the essential parameters of the ExperimentalEmulator to disk Parameters ---------- save_dir : str or pathlib.Path The directory from which to load emulator files. """ save_dir = pathlib.Path(save_dir) with open(save_dir / f"{model_name}.json", "r") as f: d = json.load(f) exp = ExperimentalEmulator.from_dict(d, **kwargs) exp.load_regressor(save_dir) return exp def parity_plot(self, **kwargs): """Produce a parity plot based for the trained model using matplotlib Parameters --------- output_variable_names : str or list, optional The output variables to plot. Defaults to all. include_test : bool, optional Include the performance of the model on the test set. Defaults to False. train_color : str, optional Hex string for the train points. Defaults to "#6f3666" test_color : str, optional Hex string for the train points. Defaults to "#3c328c" """ import matplotlib.pyplot as plt include_test = kwargs.get("include_test", False) train_color = kwargs.get("train_color", "#6f3666") test_color = kwargs.get("test_color", "#3c328c") clip = kwargs.get("clip") vars = kwargs.get("output_variable_names", self.output_variable_names) if type(vars) == str: vars = [vars] fig, axes = plt.subplots(1, len(vars)) if len(vars) > 1: fig.subplots_adjust(wspace=0.2) if type(axes) != np.ndarray: axes = np.array([axes]) # Do predictions with torch.no_grad(): y_train_pred, y_train_pred_std = self._predict(self.X_train) if include_test: y_test_pred, y_train_pred_std = self._predict(self.X_test) plots = 0 for i, v in enumerate(self.output_variable_names): if v in vars: if include_test: kwargs = dict( y_test=self.y_test[:, i], y_test_pred=y_test_pred[:, i] ) else: kwargs = {} make_parity_plot( self.y_train[:, i], y_train_pred[:, i], ax=axes[plots], train_color=train_color, test_color=test_color, title=v, **kwargs, ) plots += 1 return fig, axes
def generate_data(domain, n_examples, random_state=None): data = {} random = default_rng(random_state) for v in domain.input_variables: if v.variable_type == "continuous": data[v.name] = random.normal(size=n_examples) elif v.variable_type == "categorical": data[v.name] = random.choice(v.levels, size=n_examples) for v in domain.output_variables: if v.variable_type == "continuous": data[v.name] = random.normal(size=n_examples) return pd.DataFrame(data) def make_parity_plot( y_train, y_train_pred, y_test=None, y_test_pred=None, ax=None, train_color="#6f3666", test_color="#3c328c", title=None, ): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if ax is None: fig, ax = plt.subplots(1) ax.scatter(y_train, y_train_pred, color=train_color, alpha=0.5) # Test if y_test is not None: ax.scatter(y_test, y_test_pred, color=test_color, alpha=0.5) # Parity line min = np.min(np.concatenate([y_train, y_train_pred])) max = np.max(np.concatenate([y_train, y_train_pred])) ax.plot([min, max], [min, max], c="#747378") # Scores handles = [] r2_train = r2_score(y_train, y_train_pred) r2_train_patch = mpatches.Patch( label=f"Train R2 = {r2_train:.2f}", color=train_color ) handles.append(r2_train_patch) if y_test is not None: r2_test = r2_score(y_test, y_test_pred) r2_test_patch = mpatches.Patch( label=f"Test R2 = {r2_test:.2f}", color=test_color ) handles.append(r2_test_patch) # Formatting ax.legend(handles=handles, fontsize=12) ax.set_xlim(min, max) ax.set_ylim(min, max) ax.set_xlabel("Measured") ax.set_ylabel("Predicted") if title is not None: ax.set_title(title) ax.tick_params(direction="in") return ax def numpy_to_tensor(X): """Convert datasets into """ return torch.tensor(X).float() class UpdatedTransformedTargetRegressor(TransformedTargetRegressor): def fit(self, X, y, **fit_params): """Fit the model according to the given training data. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array-like of shape (n_samples,) Target values. **fit_params : dict Parameters passed to the ``fit`` method of the underlying regressor. Returns ------- self : object """ y = check_array( y, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype="numeric", ) # store the number of dimension of the target to predict an array of # similar shape at predict self._training_dim = y.ndim # transformers are designed to modify X which is 2d dimensional, we # need to modify y accordingly. if y.ndim == 1: y_2d = y.reshape(-1, 1) else: y_2d = y self._fit_transformer(y_2d) # transform y and convert back to 1d array if needed y_trans = self.transformer_.transform(y_2d) # Remove this stupid line # FIXME: a FunctionTransformer can return a 1D array even when validate # is set to True. Therefore, we need to check the number of dimension # first. # if y_trans.ndim == 2 and y_trans.shape[1] == 1: # y_trans = y_trans.squeeze(axis=1) if self.regressor is None: from ..linear_model import LinearRegression self.regressor_ = LinearRegression() else: self.regressor_ = clone(self.regressor) self.regressor_.fit(X, y_trans, **fit_params) return self class RecursiveNamespace(types.SimpleNamespace): # def __init__(self, /, **kwargs): # better, but Python 3.8+ def __init__(self, **kwargs): """Create a SimpleNamespace recursively""" self.__dict__.update({k: self.__elt(v) for k, v in kwargs.items()}) def __elt(self, elt): """Recurse into elt to create leaf namepace objects""" if type(elt) is dict: return type(self)(**elt) if type(elt) in (list, tuple): return [self.__elt(i) for i in elt] return elt class ProgressParallel(Parallel): def __init__(self, use_tqdm=True, total=None, *args, **kwargs): self._use_tqdm = use_tqdm self._total = total super().__init__(*args, **kwargs) def __call__(self, *args, **kwargs): with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar: return Parallel.__call__(self, *args, **kwargs) @property def total(self): return self._total @total.setter def total(self, val): self._total = val def print_progress(self): if self._total is None: self._pbar.total = self.n_dispatched_tasks self._pbar.n = self.n_completed_tasks self._pbar.refresh() class ProgressGridSearchCV(BaseSearchCV): @_deprecate_positional_args def __init__( self, estimator, param_grid, *, scoring=None, n_jobs=None, refit=True, cv=None, verbose=0, pre_dispatch="2*n_jobs", error_score=np.nan, return_train_score=False, ): super().__init__( estimator=estimator, scoring=scoring, n_jobs=n_jobs, refit=refit, cv=cv, verbose=verbose, pre_dispatch=pre_dispatch, error_score=error_score, return_train_score=return_train_score, ) self.param_grid = param_grid _check_param_grid(param_grid) def _run_search(self, evaluate_candidates): """Search all candidates in param_grid""" evaluate_candidates(ParameterGrid(self.param_grid)) @_deprecate_positional_args def fit(self, X, y=None, *, groups=None, **fit_params): """Run fit with all sets of parameters. Parameters ---------- X : array-like of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array-like of shape (n_samples, n_output) \ or (n_samples,), default=None Target relative to X for classification or regression; None for unsupervised learning. groups : array-like of shape (n_samples,), default=None Group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a "Group" :term:`cv` instance (e.g., :class:`~sklearn.model_selection.GroupKFold`). **fit_params : dict of str -> object Parameters passed to the ``fit`` method of the estimator """ estimator = self.estimator refit_metric = "score" if callable(self.scoring): scorers = self.scoring elif self.scoring is None or isinstance(self.scoring, str): scorers = check_scoring(self.estimator, self.scoring) else: scorers = _check_multimetric_scoring(self.estimator, self.scoring) # self._check_refit_for_multimetric(scorers) refit_metric = self.refit X, y, groups = indexable(X, y, groups) fit_params = _check_fit_params(X, fit_params) cv_orig = check_cv(self.cv, y, classifier=is_classifier(estimator)) n_splits = cv_orig.get_n_splits(X, y, groups) base_estimator = clone(self.estimator) parallel = ProgressParallel(n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch) fit_and_score_kwargs = dict( scorer=scorers, fit_params=fit_params, return_train_score=self.return_train_score, return_n_test_samples=True, return_times=True, return_parameters=False, error_score=self.error_score, verbose=self.verbose, ) results = {} with parallel: all_candidate_params = [] all_out = [] all_more_results = defaultdict(list) def evaluate_candidates(candidate_params, cv=None, more_results=None): cv = cv or cv_orig candidate_params = list(candidate_params) n_candidates = len(candidate_params) if self.verbose > 0: print( "Fitting {0} folds for each of {1} candidates," " totalling {2} fits".format( n_splits, n_candidates, n_candidates * n_splits ) ) runs = product( enumerate(candidate_params), enumerate(cv.split(X, y, groups)) ) parallel.total = len(list(deepcopy(runs))) out = parallel( delayed(_fit_and_score)( clone(base_estimator), X, y, train=train, test=test, parameters=parameters, split_progress=(split_idx, n_splits), candidate_progress=(cand_idx, n_candidates), **fit_and_score_kwargs, ) for (cand_idx, parameters), (split_idx, (train, test)) in runs ) if len(out) < 1: raise ValueError( "No fits were performed. " "Was the CV iterator empty? " "Were there no candidates?" ) elif len(out) != n_candidates * n_splits: raise ValueError( "cv.split and cv.get_n_splits returned " "inconsistent results. Expected {} " "splits, got {}".format(n_splits, len(out) // n_candidates) ) # For callable self.scoring, the return type is only know after # calling. If the return type is a dictionary, the error scores # can now be inserted with the correct key. The type checking # of out will be done in `_insert_error_scores`. if callable(self.scoring): _insert_error_scores(out, self.error_score) all_candidate_params.extend(candidate_params) all_out.extend(out) if more_results is not None: for key, value in more_results.items(): all_more_results[key].extend(value) nonlocal results results = self._format_results( all_candidate_params, n_splits, all_out, all_more_results ) return results self._run_search(evaluate_candidates) # multimetric is determined here because in the case of a callable # self.scoring the return type is only known after calling first_test_score = all_out[0]["test_scores"] self.multimetric_ = isinstance(first_test_score, dict) # check refit_metric now for a callabe scorer that is multimetric if callable(self.scoring) and self.multimetric_: self._check_refit_for_multimetric(first_test_score) refit_metric = self.refit # For multi-metric evaluation, store the best_index_, best_params_ and # best_score_ iff refit is one of the scorer names # In single metric evaluation, refit_metric is "score" if self.refit or not self.multimetric_: # If callable, refit is expected to return the index of the best # parameter set. if callable(self.refit): self.best_index_ = self.refit(results) if not isinstance(self.best_index_, numbers.Integral): raise TypeError("best_index_ returned is not an integer") if self.best_index_ < 0 or self.best_index_ >= len(results["params"]): raise IndexError("best_index_ index out of range") else: self.best_index_ = results["rank_test_%s" % refit_metric].argmin() self.best_score_ = results["mean_test_%s" % refit_metric][ self.best_index_ ] self.best_params_ = results["params"][self.best_index_] if self.refit: # we clone again after setting params in case some # of the params are estimators as well. self.best_estimator_ = clone( clone(base_estimator).set_params(**self.best_params_) ) refit_start_time = time.time() if y is not None: self.best_estimator_.fit(X, y, **fit_params) else: self.best_estimator_.fit(X, **fit_params) refit_end_time = time.time() self.refit_time_ = refit_end_time - refit_start_time # Store the only scorer not as a dict for single metric evaluation self.scorer_ = scorers self.cv_results_ = results self.n_splits_ = n_splits return self def _check_refit_for_multimetric(self, scores): """Check `refit` is compatible with `scores` is valid""" multimetric_refit_msg = ( "For multi-metric scoring, the parameter refit must be set to a " "scorer key or a callable to refit an estimator with the best " "parameter setting on the whole data and make the best_* " "attributes available for that metric. If this is not needed, " f"refit should be set to False explicitly. {self.refit!r} was " "passed." ) valid_refit_dict = isinstance(self.refit, str) and self.refit in scores if ( self.refit is not False and not valid_refit_dict and not callable(self.refit) ): raise ValueError(multimetric_refit_msg) def get_bnn(): from blitz.modules import BayesianLinear from blitz.utils import variational_estimator @variational_estimator class BNNRegressor(torch.nn.Module): """A Bayesian Neural Network pytorch lightining module""" val_str = "CI acc: {:.2f}, CI upper acc: {:.2f}, CI lower acc: {:.2f}" def __init__( self, input_dim, output_dim, n_examples=100, hidden_units=512, **kwargs ): super().__init__() self.blinear1 = BayesianLinear(input_dim, hidden_units) self.blinear2 = BayesianLinear(hidden_units, output_dim) self.n_examples = n_examples self.n_samples = kwargs.get("n_samples", 50) self.criterion = torch.nn.MSELoss() def forward(self, x): # for layer in self.layers[:-1]: # x = layer(x) # x = F.relu(x) # return self.layers[-1](x) x = self.blinear1(x) x = F.relu(x) return self.blinear2(x) def evaluate_regression(self, batch, samples=100, std_multiplier=1.96): """Evaluate Bayesian Neural Network This answers the question "How many correction predictions are in the confidence interval (CI)?" It also spits out the CI. Parameters ---------- batch : tuple The batch being evaluatd samples : int, optional The number of samples of the BNN for calculating the CI std_multiplier : float, optional The Z-score corresponding with the desired CI. Default is 1.96, which corresponds with a 95% CI. Returns ------- tuple of ic_acc, over_ci_lower, under_ci_upper icc_acc is the percentage within the CI. """ X, y = batch # Sample preds = torch.tensor([self(X) for i in range(samples)]) preds = torch.stack(preds) means = preds.mean(axis=0) stds = preds.std(axis=0) # Calculate CI ci_upper, ci_lower = self._calc_ci(means, stds, std_multiplier) ic_acc = (ci_lower <= y) * (ci_upper >= y) ic_acc = ic_acc.float().mean() under_ci_upper = (ci_upper >= y).float().mean() over_ci_lower = (ci_lower <= y).float().mean() ic_acc = (ci_lower <= y) * (ci_upper >= y) ic_acc = ic_acc.float().mean() return ic_acc, over_ci_lower, under_ci_upper def _calc_ci(self, means, stds, std_multiplier=1.96): ci_upper = means + (std_multiplier * stds) ci_lower = means - (std_multiplier * stds) return ci_lower, ci_upper class ANNRegressor(torch.nn.Module): """Artificial Neural Network Regressor Parameters ----------- input_dim : int The number of features in the input output_dim : int The number of outputs in the targets hidden_units : int, optional The number of hidden units. Default is 512. """ def __init__(self, input_dim, output_dim, hidden_units=512, **kwargs): super().__init__() self.num_hidden_layers = 1 self.input_layer = torch.nn.Linear(input_dim, hidden_units) self.output_layer = torch.nn.Linear(hidden_units, output_dim) def forward(self, x, **kwargs): x_ = F.relu(self.input_layer(x)) if self.num_hidden_layers > 1: x_ = self.hidden_layers(x_) x_ = F.relu(x_) return self.output_layer(x_) class RegressorRegistry: """Registry for Regressors Models registered using the register method are saved as the class name. """ regressors = {} def __getitem__(self, key): reg = self.regressors.get(key) if reg is not None: return reg else: raise KeyError( f"{key} is not in the §. Register using the register method." ) def __setitem__(self, key, value): reg = self.regressors.get(key) if reg is not None: self.regressors[key] = value def register(self, regressor): key = regressor.__name__ self.regressors[key] = regressor # Create global regressor registry registry = RegressorRegistry() registry.register(ANNRegressor) def get_data_path(): return pathlib.Path(pkg_resources.resource_filename("summit", "benchmarks/data")) def get_model_path(): return pathlib.Path(pkg_resources.resource_filename("summit", "benchmarks/models")) def get_pretrained_reizman_suzuki_emulator(case=1): model_name = f"reizman_suzuki_case_{case}" model_path = get_model_path() / model_name if not model_path.exists(): raise NotADirectoryError("Could not initialize from expected path.") exp = ReizmanSuzukiEmulator.load(model_path, case=case) data_path = get_data_path() exp.ds = DataSet.read_csv(data_path / f"{model_name}.csv") return exp
[docs]class ReizmanSuzukiEmulator(ExperimentalEmulator): """Reizman Suzuki Emulator Virtual experiments representing the Suzuki-Miyaura Cross-Coupling reaction similar to Reizman et al. (2016). Experimental outcomes are based on an emulator that is trained on the experimental data published by Reizman et al. Parameters ---------- case: int, optional, default=1 Reizman et al. (2016) reported experimental data for 4 different cases. Each case was has a different set of substrates but the same possible catalysts. Please see their paper for more information on the cases. Examples -------- >>> reizman_emulator = ReizmanSuzukiEmulator(case=1) Notes ----- This benchmark is based on data from [Reizman]_ et al. References ---------- .. [Reizman] B. J. Reizman et al., React. Chem. Eng., 2016, 1, 658–666. DOI: `10.1039/C6RE00153J <https://doi.org/10.1039/C6RE00153J>`_. """ def __init__(self, case=1, **kwargs): # Initialization model_name = kwargs.get("model_name", f"reizman_suzuki_case_{case}") domain = kwargs.pop("domain", self.setup_domain()) data_path = get_data_path() ds = DataSet.read_csv(data_path / f"{model_name}.csv") super().__init__(model_name, domain, dataset=ds, **kwargs) @staticmethod def setup_domain(): domain = Domain() # Decision variables des_1 = "Catalyst type - different ligands" domain += CategoricalVariable( name="catalyst", description=des_1, levels=[ "P1-L1", "P2-L1", "P1-L2", "P1-L3", "P1-L4", "P1-L5", "P1-L6", "P1-L7", ], ) des_2 = "Residence time in seconds (s)" domain += ContinuousVariable(name="t_res", description=des_2, bounds=[60, 600]) des_3 = "Reactor temperature in degrees Celsius (ºC)" domain += ContinuousVariable( name="temperature", description=des_3, bounds=[30, 110] ) des_4 = "Catalyst loading in mol%" domain += ContinuousVariable( name="catalyst_loading", description=des_4, bounds=[0.5, 2.5] ) # Objectives des_5 = ( "Turnover number - moles product generated divided by moles catalyst used" ) domain += ContinuousVariable( name="ton", description=des_5, bounds=[0, 200], # TODO: not sure about bounds, maybe redefine is_objective=True, maximize=False, ) des_6 = "Yield" domain += ContinuousVariable( name="yield", description=des_6, bounds=[0, 100], is_objective=True, maximize=True, ) return domain
[docs] @classmethod def load(cls, save_dir, case=1): model_name = f"reizman_suzuki_case_{case}" return super().load(model_name, save_dir)
[docs] @classmethod def to_dict(self): """Serialize the class to a dictionary""" experiment_params = dict( case=self.model_name[-1], ) return super().to_dict(**experiment_params)
def get_pretrained_baumgartner_cc_emulator(include_cost=False): model_name = "baumgartner_aniline_cn_crosscoupling" model_path = get_model_path() / model_name if not model_path.exists(): raise NotADirectoryError("Could not initialize from expected path.") data_path = get_data_path() ds = DataSet.read_csv(data_path / f"{model_name}.csv") exp = BaumgartnerCrossCouplingEmulator.load(model_path, dataset=ds) return exp
[docs]class BaumgartnerCrossCouplingEmulator(ExperimentalEmulator): """Baumgartner Cross Coupling Emulator Virtual experiments representing the Aniline Cross-Coupling reaction similar to Baumgartner et al. (2019). Experimental outcomes are based on an emulator that is trained on the experimental data published by Baumgartner et al. This is a five dimensional optimisation of temperature, residence time, base equivalents, catalyst and base. The categorical variables (catalyst and base) contain descriptors calculated using COSMO-RS. Specifically, the descriptors are the first two sigma moments. Parameters ---------- include_cost : bool, optional Include minimization of cost as an extra objective. Cost is calculated as a deterministic function of the inputs (i.e., no model is trained). Defaults to False. Examples -------- >>> bemul = BaumgartnerCrossCouplingEmulator() Notes ----- This benchmark is based on data from [Baumgartner]_ et al. References ---------- .. [Baumgartner] L. M. Baumgartner et al., Org. Process Res. Dev., 2019, 23, 1594–1601 DOI: `10.1021/acs.oprd.9b00236 <https://`doi.org/10.1021/acs.oprd.9b00236>`_ """ def __init__(self, include_cost=False, **kwargs): # TODO: make it possible to select model based on one-hot encoding or descriptors model_name = kwargs.pop("model_name", "baumgartner_aniline_cn_crosscoupling") self.include_cost = include_cost domain = kwargs.pop("domain", self.setup_domain(self.include_cost)) data_path = get_data_path() super().__init__(model_name, domain, **kwargs) @staticmethod def setup_domain(include_cost=False): domain = Domain() # Decision variables des_1 = "Catalyst type" catalyst_df = DataSet( [ [460.7543, 67.2057], # 30.8413, 2.3043, 0], #, 424.64, 421.25040226], [518.8408, 89.8738], # 39.4424, 2.5548, 0], #, 487.7, 781.11247064], [819.933, 129.0808], # 83.2017, 4.2959, 0], #, 815.06, 880.74916884], ], index=["tBuXPhos", "tBuBrettPhos", "AlPhos"], columns=[ "area_cat", "M2_cat", ], # , 'M3_cat', 'Macc3_cat', 'Mdon3_cat'] #,'mol_weight', 'sol'] ) domain += CategoricalVariable( name="catalyst", description=des_1, levels=["tBuXPhos", "tBuBrettPhos", "AlPhos"], descriptors=catalyst_df, ) des_2 = "Base" base_df = DataSet( [ [162.2992, 25.8165], # 40.9469, 3.0278, 0], #101.19, 642.2973283], [ 165.5447, 81.4847, ], # 107.0287, 10.215, 0.0169], # 115.18, 534.01544123], [227.3523, 30.554], # 14.3676, 1.1196, 0.0127], # 171.28, 839.81215], [192.4693, 59.8367], # 82.0661, 7.42, 0], # 152.24, 1055.82799], ], index=["TEA", "TMG", "BTMG", "DBU"], columns=["area", "M2"], # , 'M3', 'Macc3', 'Mdon3'], # 'mol_weight', 'sol'] ) domain += CategoricalVariable( name="base", description=des_2, levels=["DBU", "BTMG", "TMG", "TEA"], descriptors=base_df, ) des_3 = "Base equivalents" domain += ContinuousVariable( name="base_equivalents", description=des_3, bounds=[1.0, 2.5] ) des_4 = "Temperature in degrees Celsius (ºC)" domain += ContinuousVariable( name="temperature", description=des_4, bounds=[30, 100] ) des_5 = "residence time in seconds (s)" domain += ContinuousVariable(name="t_res", description=des_5, bounds=[60, 1800]) des_6 = "Yield" domain += ContinuousVariable( name="yield", description=des_6, bounds=[0.0, 1.0], is_objective=True, maximize=True, ) if include_cost: domain += ContinuousVariable( name="cost", description="cost in USD of 40 uL reaction", bounds=[0.0, 1.0], is_objective=True, maximize=False, ) return domain
[docs] @classmethod def load(cls, save_dir, **kwargs): """Load all the essential parameters of the BaumgartnerCrossCouplingEmulator from disc Parameters ---------- save_dir : str or pathlib.Path The directory from which to load emulator files. """ model_name = "baumgartner_aniline_cn_crosscoupling" return super().load(model_name, save_dir, **kwargs)
def _run(self, conditions, **kwargs): conditions, _ = super()._run(conditions=conditions, **kwargs) # Calculate costs if self.include_cost: costs = self._calculate_costs(conditions) conditions[("cost", "DATA")] = costs return conditions, {} @classmethod def _calculate_costs(cls, conditions): catalyst = conditions["catalyst"].values base = conditions["base"].values base_equiv = conditions["base_equivalents"].values # Calculate amounts droplet_vol = 40 * 1e-3 # mL mmol_triflate = 0.91 * droplet_vol mmol_anniline = 1.6 * mmol_triflate catalyst_equiv = { "tBuXPhos": 0.0095, "tBuBrettPhos": 0.0094, "AlPhos": 0.0094, } mmol_catalyst = [catalyst_equiv[c] * mmol_triflate for c in catalyst] mmol_base = base_equiv * mmol_triflate # Calculate costs cost_triflate = mmol_triflate * 5.91 # triflate is $5.91/mmol cost_anniline = mmol_anniline * 0.01 # anniline is $0.01/mmol cost_catalyst = np.array( [cls._get_catalyst_cost(c, m) for c, m in zip(catalyst, mmol_catalyst)] ) cost_base = np.array( [cls._get_base_cost(b, m) for b, m in zip(base, mmol_base)] ) tot_cost = cost_triflate + cost_anniline + cost_catalyst + cost_base if len(tot_cost) == 1: tot_cost = tot_cost[0] return tot_cost @staticmethod def _get_catalyst_cost(catalyst, catalyst_mmol): catalyst_prices = { "tBuXPhos": 94.08, "tBuBrettPhos": 182.85, "AlPhos": 594.18, } return float(catalyst_prices[catalyst] * catalyst_mmol) @staticmethod def _get_base_cost(base, mmol_base): # prices in $/mmol base_prices = { "DBU": 0.03, "BTMG": 1.2, "TMG": 0.001, "TEA": 0.01, } return float(base_prices[base] * mmol_base)