Source code for summit.strategies.deep_reaction_optimizer

from .base import Strategy, Transform
from summit.domain import *
from summit.utils.dataset import DataSet
from summit import get_summit_config_path


import numpy as np
import pandas as pd

import logging
import json
import os.path as osp
import os
import pathlib
import uuid
from collections import namedtuple
from copy import deepcopy


[docs]class DRO(Strategy): """Deep Reaction Optimizer (DRO) The DRO relies on a pretrained RL policy that can predict a next set of experiments given a set of past experiments. We suggest reading the notes below before using the DRO. Parameters ---------- domain: :class:`~summit.domain.Domain` A summit domain object transform: :class:`~summit.strategies.base.Transform`, optional A transform class (i.e, not the object itself). By default no transformation will be done the input variables or objectives. pretrained_model_config_path: string, optional Path to the config file of a pretrained DRO model (note that the number of inputs parameters should match the domain inputs) By default: a pretrained model will be used. model_size: string, optional Whether the model (policy) has the same size as originally published by the developers of DRO ("standard"), or whether the model is bigger w.r.t. number of pretraining epochs, LSTM hidden size, unroll_length ("bigger"). Note that the pretraining can increase exponentially when changing these hyperparameters and the number of input variables, the number of epochs the each bigger model was trained can be found in the "checkpoint" file in the respective `save directory <https://github.com/sustainable-processes/chemopt/tree/master/chemopt/save>`_. By default: "standard" (these models were all pretrained for 50 epochs) Attributes ---------- xbest, internal state Best point from all iterations. fbest, internal state Objective value at best point from all iterations. param, internal state A dict containing: state of LSTM of DRO, last requested point, xbest, fbest, number of iterations (corresponding to the unroll length of the LSTM) Examples ------- >>> from summit.domain import Domain, ContinuousVariable >>> from summit.strategies import DRO >>> from summit.utils.dataset import DataSet >>> domain = Domain() >>> domain += ContinuousVariable(name='temperature', description='reaction temperature in celsius', bounds=[50, 100]) >>> domain += ContinuousVariable(name='flowrate_a', description='flow of reactant a in mL/min', bounds=[0.1, 0.5]) >>> domain += ContinuousVariable(name='flowrate_b', description='flow of reactant b in mL/min', bounds=[0.1, 0.5]) >>> strategy = DRO(domain) Notes ------- The DRO requires Tensorflow version 1, while all other parts of Summit use Tensorflow version 2. Therefore, we have created a Docker container for running DRO which has TFv1 installed. We also have an option in the pip package to install TFv1. However, if you simply want to analyse results from a DRO run (i.e., use from_dict), then you will not get a tensorflow import error. We have pretrained policies for domains with up to six continuous decision variables For applying the DRO it is necessary to define reasonable bounds of the objective variable, e.g., yield in [0, 1], since the DRO normalizes the objective function values to be between 0 and 1. The DRO is based on the paper in ACS Central Science by [Zhou]_. References ---------- .. [Zhou] Z. Zhou et al., ACS Cent. Sci., 2017, 3, 1337–1344. DOI: `10.1021/acscentsci.7b00492 <https://doi.org/10.1021/acscentsci.7b00492>`_ """ def __init__( self, domain: Domain, transform: Transform = None, pretrained_model_config_path=None, model_size="standard", **kwargs ): Strategy.__init__(self, domain, transform) # Create directories to store temporary files summit_config_path = get_summit_config_path() self.uuid_val = uuid.uuid4() # Unique identifier for this run tmp_dir = summit_config_path / "dro" / str(self.uuid_val) if not os.path.isdir(tmp_dir): os.makedirs(tmp_dir) self._pretrained_model_config_path = pretrained_model_config_path self._infer_model_path = tmp_dir self._model_size = model_size self.prev_param = None
[docs] def suggest_experiments(self, prev_res: DataSet = None, **kwargs): """Suggest experiments using the Deep Reaction Optimizer Parameters ---------- num_experiments: int, optional The number of experiments (i.e., samples) to generate. Default is 1. prev_res: :class:`~summit.utils.data.DataSet`, optional Dataset with data from previous experiments. If no data is passed, the DRO optimization algorithm will be initialized and suggest initial experiments. Returns ------- next_experiments : :class:`~summit.utils.data.DataSet` A Dataset object with the suggested experiments Notes ------- """ import tensorflow as tf if tf.__version__ != "1.13.1": raise ImportError( """Tensorflow version 1.13.1 needed for DRO, which is different than the versions needed for other strategies. We suggest using the docker container marcosfelt/summit:dro. """ ) # Extract dimension of input domain self.dim = self.domain.num_continuous_dimensions() # Get bounds of input variables bounds = [] for v in self.domain.input_variables: if isinstance(v, ContinuousVariable): bounds.append(v.bounds) elif isinstance(v, CategoricalVariable): if v.ds is not None: descriptor_names = v.ds.data_columns descriptors = np.asarray( [v.ds.loc[:, [l]].values.tolist() for l in v.ds.data_columns] ) else: raise ValueError("No descriptors given for {}".format(v.name)) for d in descriptors: bounds.append([np.min(np.asarray(d)), np.max(np.asarray(d))]) # Get bounds of objective obj_maximize = False obj_bounds = None for v in self.domain.output_variables: if obj_bounds is not None: raise ValueError( "DRO can not handle multiple objectives. Please use transform." ) obj_bounds = v.bounds if v.maximize: obj_maximize = True self.bounds = np.asarray(bounds, dtype=float) # Initialization self.x0 = None self.y0 = None # Get previous results if prev_res is not None: inputs, outputs = self.transform.transform_inputs_outputs( prev_res, categorical_method="descriptors" ) # Set up maximization and minimization and normalize inputs (x) and outputs (y) for v in self.domain.variables: if v.is_objective: a, b = np.asarray(v.bounds, dtype=float) y = outputs[v.name] y = (y - a) / (b - a) if v.maximize: y = 1 - y outputs[v.name] = y else: a, b = np.asarray(v.bounds, dtype=float) x = inputs[v.name] x = (x - a) / (b - a) inputs[v.name] = x self.x0 = inputs.data_to_numpy() self.y0 = outputs.data_to_numpy() # If no prev_res are given but prev_param -> raise error elif self.prev_param is not None: raise ValueError( "Parameter from previous optimization iteration are given but previous results are " "missing!" ) # TODO: how does DRO handle constraints? if self.domain.constraints != []: raise NotImplementedError("DRO can not handle constraints yet.") next_experiments, param = self.main( num_input=self.dim, prev_res=[self.x0, self.y0], prev_param=self.prev_param ) objective_dir = -1.0 if obj_maximize else 1.0 self.fbest = ( objective_dir * param["fbest"] * (obj_bounds[1] - obj_bounds[0]) + obj_bounds[0] ) self.prev_param = param # Do any necessary transformations back next_experiments = self.transform.un_transform( next_experiments, categorical_method="descriptors" ) return next_experiments
[docs] def reset(self): """Reset internal parameters""" self.prev_param = None
[docs] def to_dict(self): """Convert hyperparameters and internal state to a dictionary""" if self.prev_param is not None: params = deepcopy(self.prev_param) tup_to_json = [ list(e) for e in [list(t) for t in list([params["state"]])][0] ] params["state"] = [ [tup_to_json[0][0].tolist(), tup_to_json[0][1].tolist()], [tup_to_json[1][0].tolist(), tup_to_json[1][1].tolist()], ] params["xbest"] = params["xbest"].tolist() params["fbest"] = params["fbest"].tolist() params["last_requested_point"] = params["last_requested_point"].tolist() else: params = None strategy_params = dict( prev_param=params, pretrained_model_config_path=self._pretrained_model_config_path, model_size=self._model_size, ) return super().to_dict(**strategy_params)
[docs] @classmethod def from_dict(cls, d): dro = super().from_dict(d) params = d["strategy_params"]["prev_param"] if params is not None: params["state"] = tuple( [ tuple([np.array(s, dtype=np.float32) for s in e]) for e in params["state"] ] ) params["xbest"] = np.array(params["xbest"]) params["fbest"] = np.array(params["fbest"]) params["last_requested_point"] = np.array(params["last_requested_point"]) dro.prev_param = params return dro
def x_convert(self, x): real_x = np.zeros([self.dim]) for i in range(self.dim): a, b = self.bounds[i] real_x[i] = x[0, i] * (b - a) + a return real_x def main(self, num_input=3, prev_res=None, prev_param=None): import chemopt from chemopt.logger import get_handlers x0, y0 = prev_res[0], prev_res[1] module_path = os.path.dirname(chemopt.__file__) if self._pretrained_model_config_path: path = self._pretrained_model_config_path else: path = osp.join( module_path, "config_" + str(num_input) + "_inputs_" + str(self._model_size) + ".json", ) config_file = open(path) config = json.load( config_file, object_hook=lambda d: namedtuple("x", d.keys())(*d.values()) ) saved_model_path = osp.join( os.path.dirname(os.path.realpath(path)), str(config.save_path) ) if prev_param: if prev_param["iteration"] > config.unroll_length: raise ValueError( "Number of iterations exceeds unroll length of the pretrained model!" ) logging.basicConfig(level=logging.WARNING, handlers=get_handlers()) logger = logging.getLogger() cell = chemopt.rnn.StochasticRNNCell( cell=chemopt.rnn.LSTM, kwargs={"hidden_size": config.hidden_size}, nlayers=config.num_layers, reuse=config.reuse, ) optimizer = self.StepOptimizer( cell=cell, ndim=config.num_params, nsteps=config.num_steps, ckpt_path=saved_model_path, infer_model_path=self._infer_model_path, logger=logger, constraints=True, x=x0, y=y0, ) x, state = optimizer.run(prev_res=y0, prev_param=prev_param) real_x = self.x_convert(x) next_experiments = {} i_inp = 0 for v in self.domain.variables: if not v.is_objective: next_experiments[v.name] = [real_x[i_inp]] i_inp += 1 next_experiments = DataSet.from_df(pd.DataFrame(data=next_experiments)) next_experiments[("strategy", "METADATA")] = ["DRO"] param = {} if not y0: y0 = np.array([[float("inf")]]) param["iteration"] = 0 else: param["iteration"] = prev_param["iteration"] + 1 if not prev_param: self.fbest = y0[0] self.xbest = real_x elif y0 < prev_param["fbest"]: self.fbest = y0[0] self.xbest = real_x else: self.fbest = prev_param["fbest"] self.xbest = prev_param["xbest"] param.update( { "state": state, "last_requested_point": x, "xbest": self.xbest, "fbest": self.fbest, } ) tf.reset_default_graph() return next_experiments, param class StepOptimizer: def __init__( self, cell, ndim, nsteps, ckpt_path, infer_model_path, logger, constraints, x, y, ): self.logger = logger self.cell = cell self.ndim = ndim self.nsteps = nsteps self.ckpt_path = ckpt_path self._infer_model_path = infer_model_path self.constraints = constraints self.init_state = self.cell.get_initial_state(1, tf.float32) self.results = self.build_graph() self.x, self.y = x, y self.saver = tf.train.Saver(tf.global_variables()) def get_state_shapes(self): return [ (s[0].get_shape().as_list(), s[1].get_shape().as_list()) for s in self.init_state ] def step(self, sess, x, y, state): feed_dict = {"input_x:0": x, "input_y:0": y} for i in range(len(self.init_state)): feed_dict["state_l{0}_c:0".format(i)] = state[i][0] feed_dict["state_l{0}_h:0".format(i)] = state[i][1] new_x, new_state = sess.run(self.results, feed_dict=feed_dict) return new_x, new_state def build_graph(self): x = tf.placeholder(tf.float32, shape=[1, self.ndim], name="input_x") y = tf.placeholder(tf.float32, shape=[1, 1], name="input_y") state = [] for i in range(len(self.init_state)): state.append( ( tf.placeholder( tf.float32, shape=self.init_state[i][0].get_shape(), name="state_l{0}_c".format(i), ), tf.placeholder( tf.float32, shape=self.init_state[i][1].get_shape(), name="state_l{0}_h".format(i), ), ) ) with tf.name_scope("opt_cell"): new_x, new_state = self.cell(x, y, state) if self.constraints: new_x = tf.clip_by_value(new_x, 0.001, 0.999) return new_x, new_state def load(self, sess, model_path): try: self.saver.restore(sess, model_path) except: raise FileNotFoundError("No checkpoint available") def get_init(self): x = np.random.normal(loc=0.5, scale=0.2, size=(1, self.ndim)) x = np.maximum(np.minimum(x, 0.9), 0.1) init_state = [ (np.zeros(s[0]), np.zeros(s[1])) for s in self.get_state_shapes() ] return x, init_state def run(self, prev_res=None, prev_param=None): with tf.Session() as sess: if prev_res is None: x, state = self.get_init() ckpt = tf.train.get_checkpoint_state(self.ckpt_path) model_id = ckpt.model_checkpoint_path.split("/")[-1] init_model = os.path.join(os.path.dirname(self.ckpt_path), model_id) self.load(sess, init_model) self._infer_model_path = os.path.join( self._infer_model_path, model_id ) self.saver.save(sess, self._infer_model_path) else: ckpt = tf.train.get_checkpoint_state(self.ckpt_path) model_id = ckpt.model_checkpoint_path.split("/")[-1] self._infer_model_path = os.path.join( self._infer_model_path, model_id ) self.load(sess, self._infer_model_path) state = prev_param["state"] if not np.allclose(self.x, prev_param["last_requested_point"]): raise ValueError( "Values for input variables do not match requested points: {} != {}".format( str(self.x), str(prev_param["last_requested_point"]) ) ) x, state = self.step(sess, self.x, self.y, state) self.saver.save(sess, self._infer_model_path) return x, state