Source code for summit.strategies.snobfit

import pdb
from .base import Strategy, Transform
from summit.domain import *
from summit.utils.dataset import DataSet

import math
import numpy
from copy import deepcopy
import pandas as pd
import warnings


[docs]class SNOBFIT(Strategy): """Stable Noisy Optimization by Branch and Fit (SNOBFIT) SNOBFIT is designed to quickly optimise noisy functions. Parameters ---------- domain : :class:`~summit.domain.Domain` The domain of the optimization transform : :class:`~summit.strategies.base.Transform`, optional A transform object. By default no transformation will be done on the input variables or objectives. probability_p: float, optional The probability p that a point of class 4 is generated, i.e., higher p leads to more exploration. Default is 0.5. dx_dim: float, optional only used for the definition of a new problem: two points are considered to be different if they differ by at least dx(i) in at least one coordinate i. Default is 1E-5. Examples -------- >>> from summit.domain import Domain, ContinuousVariable >>> from summit.strategies import SNOBFIT >>> from summit.utils.dataset import DataSet >>> import pandas as pd >>> domain = Domain() >>> domain += ContinuousVariable(name='temperature', description='reaction temperature in celsius', bounds=[0, 100]) >>> domain += ContinuousVariable(name='flowrate_a', description='flow of reactant a in mL/min', bounds=[0, 1]) >>> domain += ContinuousVariable(name='flowrate_b', description='flow of reactant b in mL/min', bounds=[0.1, 0.9]) >>> domain += ContinuousVariable(name="yld", description='relative conversion to xyz', bounds=[0,100], is_objective=True, maximize=True) >>> d = {'temperature': [50,40,70,30], 'flowrate_a': [0.6,0.3,0.2,0.1], 'flowrate_b': [0.1,0.3,0.2,0.1], 'yld': [0.7,0.6,0.3,0.1]} >>> df = pd.DataFrame(data=d) >>> initial = DataSet.from_df(df) >>> strategy = SNOBFIT(domain) >>> next_experiments = strategy.suggest_experiments(5, initial) Notes ------ SNOBFIT was created by [Huyer]_ et al. This implementation is based on the python reimplementation [SQSnobFit]_ of the original MATLAB code by [Neumaier]_. Note that SNOBFIT sometimes returns more experiments than requested when the number of experiments request is small (i.e., 1 or 2). This seems to be a general issue with the algorithm instead of the specific implementation used here. References ---------- .. [Huyer] W. Huyer et al., ACM Trans. Math. Softw., 2008, 35, 1–25. DOI: `10.1145/1377612.1377613 <https://doi.org/10.1145/1377612.1377613>`_. .. [SQSnobFit] Lavrijsen, W. SQSnobFit `<https://pypi.org/project/SQSnobFit/>`_ .. [Neumaier] `<https://www.mat.univie.ac.at/~neum/software/snobfit/>`_ """ def __init__(self, domain: Domain, **kwargs): Strategy.__init__(self, domain, **kwargs) self._p = kwargs.get("probability_p", 0.5) self._dx_dim = kwargs.get("dx_dim", 1e-5) self.prev_param = None
[docs] def suggest_experiments( self, num_experiments=1, prev_res: DataSet = None, **kwargs ): """Suggest experiments using the SNOBFIT method Parameters ---------- num_experiments: int, optional The number of experiments (i.e., samples) to generate. Default is 1. prev_res: summit.utils.data.DataSet, optional Dataset with data from previous experiments. If no data is passed, the SNOBFIT optimization algorithm will be initialized and suggest initial experiments. Returns ------- next_experiments: DataSet A `Dataset` object with the suggested experiments by SNOBFIT algorithm """ silence_warnings = kwargs.get("silence_warnings", True) if silence_warnings: warnings.filterwarnings("ignore", category=DeprecationWarning) # get objective name and whether optimization is maximization problem obj_name = None obj_maximize = False for v in self.domain.variables: i = 0 if v.is_objective: i += 1 if i > 1: raise ValueError( "SNOBFIT is not able to optimize multiple objectives, please use transform." ) obj_name = v.name if v.maximize: obj_maximize = True # get parameters from previous iterations inner_prev_param = None if self.prev_param is not None: # get parameters for Nelder-Mead from previous iterations inner_prev_param = self.prev_param[0] # recover invalid experiments from previous iteration if self.prev_param[1] is not None: invalid_res = self.prev_param[1][0].drop(("constraint", "DATA"), axis=1) prev_res = pd.concat([prev_res, invalid_res]) ## Generation of new suggested experiments. # An inner function is called loop-wise to get valid experiments and # avoid suggestions of experiments that violate constraints. # If no valid experiment is found after #<inner_iter_tol>, an error is raised. inner_iter_tol = 5 c_iter = 0 valid_next_experiments = False next_experiments = None while not valid_next_experiments and c_iter < inner_iter_tol: valid_next_experiments = False next_experiments, xbest, fbest, param = self._inner_suggest_experiments( num_experiments=num_experiments, prev_res=prev_res, prev_param=inner_prev_param, ) # Invalid experiments hidden from data returned to user but stored internally elswehere invalid_experiments = next_experiments.loc[ next_experiments[("constraint", "DATA")] == False ] next_experiments = next_experiments.loc[ next_experiments[("constraint", "DATA")] != False ] prev_res = prev_res if len(next_experiments) and len(invalid_experiments): valid_next_experiments = True # pass NaN if at least one constraint is violated invalid_experiments[(obj_name, "DATA")] = numpy.nan elif len(invalid_experiments): # pass NaN if at least one constraint is violated invalid_experiments[(obj_name, "DATA")] = numpy.nan prev_res = invalid_experiments else: valid_next_experiments = True inner_prev_param = param param = [param, [invalid_experiments]] c_iter += 1 if c_iter >= inner_iter_tol: warnings.warn( "No new points found. Internal stopping criterion is reached." ) # return only valid experiments (invalid experiments are stored in param[1]) next_experiments = next_experiments.drop(("constraint", "DATA"), axis=1) objective_dir = -1.0 if obj_maximize else 1.0 self.fbest = objective_dir * fbest self.prev_param = param self.xbest = xbest return next_experiments
[docs] def reset(self): """Reset internal parameters""" self.prev_param = None
[docs] def to_dict(self): """Convert hyperparameters and internal state to a dictionary""" if self.prev_param is not None: params = deepcopy(self.prev_param) params[0] = (params[0][0].tolist(), params[0][1], params[0][2].tolist()) params[1] = [p.to_dict() for p in params[1]] else: params = None strategy_params = dict( probability_p=self._p, dx_dim=self._dx_dim, prev_param=params ) return super().to_dict(**strategy_params)
[docs] @classmethod def from_dict(cls, d): snobfit = super().from_dict(d) params = d["strategy_params"]["prev_param"] if params is not None: params[0] = ( numpy.array(params[0][0]), params[0][1], numpy.array(params[0][2]), ) params[1] = [DataSet.from_dict(p) for p in params[1]] snobfit.prev_param = params return snobfit
def _inner_suggest_experiments( self, num_experiments, prev_res: DataSet = None, prev_param=None ): """Inner loop for generation of suggested experiments using the SNOBFIT method Parameters ---------- num_experiments: int The number of experiments (i.e., samples) to generate prev_res: summit.utils.data.DataSet, optional Dataset with data from previous experiments. If no data is passed, the SNOBFIT optimization algorithm will be initialized will suggest initial experiments. prev_param: file.txt TODO: how to handle this? File with parameters of SNOBFIT algorithm from previous iterations of a optimization problem. If no data is passed, the SNOBFIT optimization algorithm will be initialized. Returns ------- next_experiments: DataSet A `Dataset` object with the suggested experiments by SNOBFIT algorithm xbest: list List with variable settings of experiment with best outcome fbest: float Objective value at xbest param: list List with parameters and prev_param of SNOBFIT algorithm (required for next iteration) """ # Extract dimension of input domain dim = self.domain.num_continuous_dimensions() # intern stay_inner = False # Get bounds of input variables bounds = [] input_var_names = [] output_var_names = [] for v in self.domain.variables: if not v.is_objective: if isinstance(v, ContinuousVariable): bounds.append(v.bounds) input_var_names.append(v.name) elif isinstance(v, CategoricalVariable): if v.ds is not None: descriptor_names = v.ds.data_columns descriptors = numpy.asarray( [ v.ds.loc[:, [l]].values.tolist() for l in v.ds.data_columns ] ) else: raise ValueError("No descriptors given for {}".format(v.name)) for d in descriptors: bounds.append( [numpy.min(numpy.asarray(d)), numpy.max(numpy.asarray(d))] ) input_var_names.extend(descriptor_names) else: raise TypeError( "SNOBFIT can not handle variable type: {}".format(v.type) ) else: output_var_names.extend(v.name) bounds = numpy.asarray(bounds, dtype=float) # Initialization x0 = [] y0 = [] # Get previous results if prev_res is not None: # get always the same order according to the ordering in the domain -> this is actually done within transform # ordered_var_names = input_var_names + output_var_names # prev_res = prev_res[ordered_var_names] # transform inputs, outputs = self.transform.transform_inputs_outputs( prev_res, categorical_method="descriptors" ) # Set up maximization and minimization for v in self.domain.variables: if v.is_objective and v.maximize: outputs[v.name] = -1 * outputs[v.name] x0 = inputs.data_to_numpy().astype(float) y0 = outputs.data_to_numpy().astype(float) # Add uncertainties to measurements TODO: include uncertainties in input y = [] for i in range(y0.shape[0]): y.append([y0[i].tolist()[0], math.sqrt(numpy.spacing(1))]) y0 = numpy.asarray(y, dtype=float) # If no prev_res are given but prev_param -> raise error elif prev_param is not None: raise ValueError( "Parameter from previous optimization iteration are given but previous results are " "missing!" ) # if no previous results are given initialize with empty lists if not len(x0): x0 = numpy.array(x0).reshape(0, len(bounds)).astype(float) y0 = numpy.array(y0).reshape(0, 2).astype(float) """ Determine SNOBFIT parameters config structure variable defining the box [u,v] in which the points are to be generated, the number nreq of points to be generated and the probability p that a point of type 4 is generated config = struct('bounds',{u,v},'nreq',nreq,'p',p) dx only used for the definition of a new problem (when the program should continue from the values stored in file.mat, the call should have only 4 input parameters!) n-vector (n = dimension of the problem) of minimal stnumpy.spacing(1), i.e., two points are considered to be different if they differ by at least dx(i) in at least one coordinate i """ config = {"bounds": bounds, "p": self._p, "nreq": num_experiments} dx = (bounds[:, 1] - bounds[:, 0]) * self._dx_dim # Run SNOBFIT for one iteration request, xbest, fbest, param = self.snobfit(x0, y0, config, dx, prev_param) # Generate DataSet object with variable values of next experiments next_experiments = {} for i, v in enumerate(input_var_names): next_experiments[v] = request[:, i] next_experiments = DataSet.from_df(pd.DataFrame(data=next_experiments)) # Violate constraint mask_valid_next_experiments = self.check_constraints(next_experiments) if not any(mask_valid_next_experiments): stay_inner = True if stay_inner: # add infinity as next_experiments[("constraint", "DATA")] = False else: # add optimization strategy next_experiments[("constraint", "DATA")] = mask_valid_next_experiments next_experiments[("strategy", "METADATA")] = ["SNOBFIT"] * len(request) # Do any necessary transformation back next_experiments = self.transform.un_transform( next_experiments, categorical_method="descriptors" ) return next_experiments, xbest, fbest, param
[docs] def snobfit(self, x, f, config, dx=None, prev_param=None): """ The following snobfit code was copied and modified from the SQSnobFit package and was originally published by Wim Lavrijsen. The SQSnobFit package includes a python version of SNOBFIT which was originally published by A. Neumaier. Copyright of SNOBFIT (v2.1): Neumaier, University of Vienna Website: https://www.mat.univie.ac.at/~neum/software/snobfit/ Copyright of SQSnobfit (v0.4.2) UC Regents, Berkeley Website: https://pypi.org/project/SQSnobFit/ """ """ request, xbest, fbest = snobfit(x, f, config, dx=None) minimization of a function over a box in R^n Input: file name of file for input and output if nargin < 5, the program continues a previous run and reads from file.mat the output is (again) stored in file.mat ^^do not use file - store variables globally, or make them available to be passed in? x the rows are a set of new points entering the optimization algorithm together with their function values f matrix containing the corresponding function values and their uncertainties, i.e., f(j,1) = f(x(j)) and f(j,2) = df(x(j)) a value f(j,2)<=0 indicates that the corresponding uncertainty is not known, and the program resets it to sqrt(numpy.spacing(1)) config structure variable defining the box [u,v] in which the points are to be generated, the number nreq of points to be generated and the probability p that a point of type 4 is generated config = struct('bounds',{u,v},'nreq',nreq,'p',p) dx only used for the definition of a new problem (when the program should continue from the values stored in file.mat, the call should have only 4 input parameters!) n-vector (n = dimension of the problem) of minimal stnumpy.spacing(1), i.e., two points are considered to be different if they differ by at least dx(i) in at least one coordinate i prev_res results of previous iterations Output: request nreq x (n+3)-matrix request[j,1:n] is the jth newly generated point, request[j,n+1] is its estimated function value and request[j,n+3] indicates for which reason the point request[j,1:n] has been generated request[j,n+3] = 1 best prediction = 2 putative local minimizer = 3 alternative good point = 4 explore empty region = 5 to fill up the required number of function values if too little points of the other classes are found xbest current best point fbest current best function value (i.e. function value at xbest) res current results (this iteration) including results from previous iterations """ from SQSnobFit._gen_utils import diag, max_, min_, find, extend, rand, sort from SQSnobFit._snobinput import snobinput from SQSnobFit._snoblocf import snoblocf, snobround from SQSnobFit._snoblp import snoblp from SQSnobFit._snobnan import snobnan from SQSnobFit._snobnn import snobnn from SQSnobFit._snobpoint import snobpoint from SQSnobFit._snobqfit import snobqfit from SQSnobFit._snobsplit import snobsplit from SQSnobFit._snobupdt import snobupdt from SQSnobFit._snob5 import snob5 ind = find(f[:, 1] <= 0) if not (ind.size <= 0 or numpy.all(ind == 0)): f[ind, 1] = math.sqrt(numpy.spacing(1)) # may be wrong rho = 0.5 * (math.sqrt(5) - 1) # golden section number bounds = config["bounds"] u1 = bounds[:, 0].reshape(1, len(bounds)) # lower v1 = bounds[:, 1].reshape(1, len(bounds)) # upper nreq = config["nreq"] p = config["p"] n = u1.shape[1] # dimension of the problem nneigh = n + 5 # number of nearest neighbors dy = 0.1 * (v1 - u1) # defines the vector of minimal distances between two # points suggested in a single call to Snobfit if prev_param is None: # a new job is started if numpy.any(dx <= 0): raise ValueError("dx should contain only positive entries") if dx.shape[0] > 1: dx = dx.T if x.size > 0: u = numpy.minimum(x.min(axis=0), u1) v = numpy.maximum(x.max(axis=0), v1) else: u = u1.copy() v = v1.copy() x, f, np, t = snobinput(x, f) # throw out duplicates among the points # and compute mean function value and # deviation if x.size > 0: xl, xu, x, f, nsplit, small = snobsplit(x, f, u, v, None, u, v) d = numpy.inf * numpy.ones((1, len(x))) else: xl = numpy.array([]) xu = numpy.array([]) nsplit = numpy.array([]) small = numpy.array([]) notnan = find(numpy.isfinite(f[:, 0])) if notnan.size > 0: fmn = min_(f[notnan, 1]) fmx = max_(f[notnan, 1]) else: fmn = 1 fmx = 0 if len(x) >= nneigh + 1 and fmn < fmx: inew = range(len(x)) near = numpy.zeros((len(x), nneigh)) d = numpy.zeros(len(x)) for j in inew: near[j], d[j] = snobnn(x[j], x, nneigh, dx) fnan = find(numpy.isnan(f[:, 0])) if fnan.size > 0: f = snobnan(fnan, f, near, inew) jsize = inew[-1] y = numpy.zeros((jsize, 2)) g = numpy.zeros((jsize, 2)) sigma = numpy.zeros(jsize) f = extend(f, 1) for j in inew: y[j], f[j, 2], c, sigma[j] = snoblocf( j, x, f[:, 0:2], near, dx, u, v ) g[j] = c.reshape(1, len(c)) fbest, jbest = min_(f[:, 0]) xbest = x[jbest] else: fnan = numpy.array([], dtype=int) near = numpy.array([], dtype=int) d = numpy.inf * numpy.ones((1, len(x))) x5 = snob5(x, u1, v1, dx, nreq) request = numpy.concatenate( (x5, numpy.nan * numpy.ones((nreq, 1)), 5 * numpy.ones((nreq, 1))), 1, ) if x.size > 0 and f.size > 0: fbest, jbest = min_(f[:, 0]) xbest = x[jbest] else: xbest = numpy.nan * numpy.ones((1, n)) fbest = numpy.inf if len(request) < nreq: snobwarn() y = None im_storage = ( xbest, fbest, x, f, xl, xu, y, nsplit, small, near, d, np, t, fnan, u, v, dx, ) return request, xbest, fbest, im_storage else: xnew = x.copy() fnew = f.copy() ( xbest, fbest, x, f, xl, xu, y, nsplit, small, near, d, np, t, fnan, u, v, dx, ) = prev_param nx = len(xnew) oldxbest = xbest xl, xu, x, f, nsplit, small, near, d, np, t, inew, fnan, u, v = snobupdt( xl, xu, x, f, nsplit, small, near, d, np, t, xnew, fnew, fnan, u, v, u1, v1, dx, ) if near.size > 0: ind = find(numpy.isnan(f[:, 0])) if ind.size > 0: fnan = numpy.concatenate((fnan, ind.flatten())) if fnan.size > 0: f = snobnan(fnan, f, near, inew) fbest, jbest = min_(f[:, 0]) xbest = x[jbest] jsize = int(inew[-1] + 1) if y is None: y = numpy.zeros((jsize, x.shape[1])) else: y = numpy.append( y, numpy.zeros((jsize - len(y), x.shape[1])), axis=0 ) g = numpy.zeros((jsize, x.shape[1])) sigma = numpy.zeros(jsize) f = extend(f, x.shape[1] - 1) for j in inew: y[j], f[j, 2], c, sigma[j] = snoblocf( j, x, f[:, 0:2], near, dx, u, v ) g[j] = c else: x5 = snob5(x, u1, v1, dx, nreq) request = numpy.concatenate( (x5, numpy.NaN * numpy.ones((nreq, 1)), 5 * numpy.ones((nreq, 1))), 1, ) if x.size > 0: (fbest, ibest) = min_(f[:, 0]) xbest = x[ibest] else: xbest = numpy.array([]) fbest = numpy.inf if request.shape[0] < nreq: snobwarn() im_storage = ( xbest, fbest, x, f, xl, xu, y, nsplit, small, near, d, np, t, fnan, u, v, dx, ) return request, xbest, fbest, im_storage sx = len(x) request = numpy.array([]).reshape(0, x.shape[1] + 2) ind = find( numpy.sum( numpy.logical_and( xl <= numpy.outer(numpy.ones(sx), v1), xu >= numpy.outer(numpy.ones(sx), u1), ), 1, ) == n ) minsmall, k = min_(small[ind]) maxsmall = small[ind].max(0) m1 = numpy.floor((maxsmall - minsmall) / 3) k = find(small[ind] == minsmall) k = ind[k].flatten() fsort, j = sort(f[k, 0]) k = k[j] isplit = k[0] if numpy.sum(numpy.logical_and(u1 <= xbest, xbest <= v1)) == n: z, f1 = snobqfit(jbest, x, f[:, 0], near, dx, u1, v1) else: fbes, jbes = min_(f[ind, 0]) jbes = ind[jbes] xbes = x[jbes] z, f1 = snobqfit(jbes, x, f[:, 0], near, dx, u1, v1) z = snobround(z, u1, v1, dx) zz = numpy.outer(numpy.ones(sx), z) j = find(numpy.sum(numpy.logical_and(xl <= zz, zz <= xu), 1) == n) if len(j) > 1: msmall, j1 = min_(small[j]) j = j[j1] if numpy.min( numpy.max( numpy.abs(x - numpy.outer(numpy.ones(sx), z)) - numpy.outer(numpy.ones(sx), dx) ) ) >= -numpy.spacing(1): dmax = numpy.max((xu[j] - xl[j]) / (v - u)) dmin = numpy.min((xu[j] - xl[j]) / (v - u)) if dmin <= 0.05 * dmax: isplit = numpy.append(isplit, j) else: request = numpy.vstack( ( request, numpy.concatenate((z, numpy.array((f1, 1), ndmin=2)), axis=1), ) ) if len(request) < nreq: globloc = nreq - len(request) glob1 = globloc * p glob2 = math.floor(glob1) if rand(1) < glob1 - glob2: glob = glob2 + 1 else: glob = glob2 loc = globloc - glob if loc: local, nlocal = snoblp(f[:, 0], near, ind) fsort, k = sort(f[local, 2]) # uhhhhhh j = 0 sreq = len(request) while sreq < (nreq - glob) and j < len(local): l0 = local[k[j]] y1 = snobround(y[l0], u1, v1, dx) yy = numpy.outer(numpy.ones((len(x), 1)), y1) l = find(numpy.sum(numpy.logical_and(xl <= yy, yy <= xu), 1) == n) if len(l) > 1: msmall, j1 = min_(small[l]) l = l[j1] dmax = numpy.max((xu[l] - xl[l]) / (v - u)) dmin = numpy.min((xu[l] - xl[l]) / (v - u)) if dmin <= 0.05 * dmax: isplit = numpy.append(isplit, l) j += 1 continue if numpy.max(abs(y1 - x[l]) - dx) >= -numpy.spacing(1) and ( not sreq or numpy.min( numpy.max( numpy.abs( request[:, 0:n] - numpy.outer(numpy.ones(sreq), y1) ) - numpy.outer(numpy.ones(sreq), numpy.maximum(dy, dx)), axis=1, ) ) >= -numpy.spacing(1) ): if numpy.sum(y1 == y[l0]) < n: D = f[l0, 1] / dx ** 2 # Possibly problem area! f1 = ( f[l0, 0] + g[l0].dot((y1 - x[l0]).T) + sigma[l0] * ( (y1 - x[l0]).dot( diag(D).dot((y1 - x[l0]).T) + f[l0, 1] ) ) ) else: f1 = f[l0, 2] request = numpy.vstack( ( request, numpy.concatenate( (y1, numpy.array((f1, 2), ndmin=2)), axis=1 ), ) ) sreq = len(request) j += 1 if sreq < nreq - glob: fsort, k = sort(f[nlocal, 2]) j = 0 while sreq < (nreq - glob) and j < len(nlocal): l0 = nlocal[k[j]] y1 = snobround(y[l0], u1, v1, dx) yy = numpy.outer(numpy.ones(len(x)), y1) l = find(numpy.sum(numpy.logical_and(xl <= yy, yy <= xu), 1) == n) if len(l) > 1: msmall, j1 = min_(small[l]) l = l[j1] dmax = numpy.max((xu[l] - xl[l]) / (v - u)) dmin = numpy.min((xu[l] - xl[l]) / (v - u)) if dmin <= 0.05 * dmax: isplit = numpy.append(isplit, l) j += 1 continue if numpy.max(numpy.abs(y1 - x[l]) - dx) >= -numpy.spacing(1) and ( not sreq or numpy.min( numpy.max( numpy.abs( request[:, :n] - numpy.outer(numpy.ones(sreq), y1) ) - numpy.outer(numpy.ones(sreq), numpy.maximum(dy, dx)), axis=1, ) ) >= -numpy.spacing(1) ): if numpy.sum(y1 == y[l0]) < n: D = f[l0, 1] / (dx ** 2) f1 = ( f[l0, 0] + g[l0].dot((y1 - x[l0]).T) + sigma[l0] * ( ((y1 - x[l0]).dot(diag(D).dot((y1 - x[l0]).T))) + f[l0, 1] ) ) else: f1 = f[l0, 2] request = numpy.vstack( ( request, numpy.concatenate( (y1, numpy.array((f1, 3), ndmin=2)), axis=1 ), ) ) sreq = len(request) j += 1 sreq = len(request) for l in isplit.flatten(): jj = find(ind == l) ind = numpy.delete(ind, jj) # ind(jj) = [] y1, f1 = snobpoint( x[l], xl[l], xu[l], f[l, 0:2], g[l], sigma[l], u1, v1, dx ) if numpy.max(numpy.abs(y1 - x[l]) - dx) >= -numpy.spacing(1) and ( not sreq or numpy.min( numpy.max( numpy.abs(request[:, :n] - numpy.outer(numpy.ones(sreq), y1)) - numpy.outer(numpy.ones(sreq), dx), axis=1, ) ) >= -numpy.spacing(1) ): request = numpy.vstack( ( request, numpy.concatenate((y1, numpy.array((f1, 4), ndmin=2)), axis=1), ) ) sreq = len(request) if sreq == nreq: break first = True while ( sreq < nreq ) and ind.size > 0: # and find(small[ind] <= (minsmall + m1)).any(): for m in range(int(m1 + 1)): if first: first = False continue m = 0 k = find(small[ind] == minsmall + m) while k.size <= 0: m += 1 k = find(small[ind] == minsmall + m) if k.size > 0: k = ind[k].flatten() fsort, j = sort(f[k, 0]) k = k[j] l = int(k[0]) jj = find(ind == l) ind = numpy.delete(ind, jj) y1, f1 = snobpoint( x[l], xl[l], xu[l], f[l, 0:2], g[l], sigma[l], u1, v1, dx ) if numpy.max(numpy.abs(y1 - x[l]) - dx) >= -numpy.spacing(1) and ( not sreq or numpy.min( numpy.max( numpy.abs( request[:, :n] - numpy.outer(numpy.ones(sreq), y1) ) - numpy.outer(numpy.ones(sreq), numpy.maximum(dy, dx)), axis=1, ) ) >= -numpy.spacing(1) ): request = numpy.vstack( ( request, numpy.concatenate( (y1, numpy.array((f1, 4), ndmin=2)), axis=1 ), ) ) sreq = len(request) if sreq == nreq: break m = 0 if len(request) < nreq: x5 = snob5( numpy.concatenate((x, request[:, :n])), u1, v1, dx, nreq - len(request) ) nx = len(x) for j in range(len(x5)): x5j = x5[j, :] i = find( ( numpy.sum(xl <= numpy.outer(numpy.ones(nx), x5j)) and (numpy.outer(numpy.ones((nx, 1)), x5j) <= xu), 1, ) == n ) if len(i) > 1: minv, i1 = min_(small[i]) i = i[i1] D = f[i, 1] / (dx ** 2) f1 = ( f[i, 0] + (x5j - x[i]).dot(g[i].T) + sigma[i] * ((x5j - x[i]).dot(diag(D).dot((x5j - x[i]).T))) + f[i, 1] ) request = numpy.vstack( ( request, numpy.concatenate((x5j, numpy.array((f1, 5), ndmin=2)), axis=1), ) ) if len(request) < nreq: snobwarn() im_storage = ( xbest, fbest, x, f, xl, xu, y, nsplit, small, near, d, np, t, fnan, u, v, dx, ) return request, xbest, fbest, im_storage
# Function to check whether a point meets the constraints of the domain def check_constraints(self, tmp_next_experiments): constr_mask = numpy.asarray([True] * len(tmp_next_experiments)).T if len(self.domain.constraints) > 0: constr = [c.constraint_type + "0" for c in self.domain.constraints] constr_mask = [ pd.eval(c.lhs + constr[i], resolvers=[tmp_next_experiments]) for i, c in enumerate(self.domain.constraints) ] constr_mask = numpy.asarray([c.tolist() for c in constr_mask]).T constr_mask = constr_mask.all(1) return constr_mask