<- Outer Loop Cookbook: All recipes
Categorical choice parameters
Basic example that optimizes a scalar array and a set of choices.
Python code
# =====================
# The optimization code
# =====================
import itertools
import math
import os
import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.double
# Botorch heuristics configuration
NUM_RESTARTS = 60
RAW_SAMPLES = 1024
N_RANDOM = 10
N_BATCH = 60
MC_SAMPLES = 256
def optimize_acqf_brute_force_choices(
acq_function, scalar_bounds, choices_shape, inequality_constraints,
post_processing_func):
"""
Similar to botorch's optimize_acqf_mixed, but implemented differently. (No
"fixed_features" logic, instead just pass the choice parameters into f
separately from the scalar parameters)
"""
all_results = []
for x_choices in itertools.product(*[range(num_choices)
for num_choices in choices_shape]):
x_choices = torch.tensor(x_choices, device=X.device)
def wrapped_acq_function(x):
return acq_function(x, x_choices)
candidates, acq_values = botorch.optim.optimize_acqf(
acq_function=wrapped_acq_function,
bounds=scalar_bounds,
q=1,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES,
inequality_constraints=inequality_constraints,
post_processing_func=post_processing_func,
)
x = candidates.detach()
all_results.append(((x, x_choices), acq_values))
((best_x,
best_x_choices),
acq_values) = max(all_results, key=lambda x: x[1])
return (best_x, best_x_choices), acq_values
def optimize(f, X, Y, bounds, choices_shape, inequality_constraints,
rounding_function, model_class):
state_dict = None
model = None
for iteration in range(N_BATCH):
model = model_class(X, Y).to(X)
if state_dict is not None:
# Use the current state dict to speed up fitting.
model.load_state_dict(state_dict)
botorch.fit_gpytorch_model(
gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood,
model)
)
qNEI = botorch.acquisition.qNoisyExpectedImprovement(
model=model,
X_baseline=X,
sampler=botorch.sampling.SobolQMCNormalSampler(
num_samples=MC_SAMPLES),
objective=botorch.acquisition.GenericMCObjective(
lambda Z: -Z[..., 0])
)
def concat_then_qNEI(x_scalars, x_choices):
# Ideally we would never concatenate these, but botorch models don't
# support multiple inputs. (gpytorch allows train_X to be a tuple
# but botorch doesn't)
x = torch.cat(
(x_scalars,
x_choices.unsqueeze(0).expand((x_scalars.shape[0], 1, -1))),
dim=-1)
return qNEI(x)
(x_scalars,
x_choices), _ = optimize_acqf_brute_force_choices(
concat_then_qNEI, bounds, choices_shape,
inequality_constraints, rounding_function)
X_new = torch.cat((x_scalars,
x_choices.expand((x_scalars.shape[0], -1))),
dim=-1)
Y_new = f(X_new)
X = torch.cat([X, X_new])
Y = torch.cat([Y, Y_new])
state_dict = model.state_dict()
print(f"Batch {(iteration + 1):>2}")
return X, Y, model
# ==========================================================
# A tiny library for dealing with parameters and constraints
# ==========================================================
class LogScalar:
def __init__(self, name, lower, upper):
self.name = name
self.bounds = [math.log(lower), math.log(upper)]
def to_user_space(self, x):
return math.exp(x)
def to_search_space(self, v):
return math.log(v)
def torch_round(self, x):
return x
class LogInt(LogScalar):
def to_user_space(self, x):
return round(math.exp(x))
def torch_round(self, x):
return x.exp().round().log()
class Int:
def __init__(self, name, lower, upper):
self.name = name
self.bounds = [lower, upper]
def to_user_space(self, x):
return int(round(x))
def to_search_space(self, v):
return float(v)
def torch_round(self, x):
return x.round()
class Choice(Int):
def __init__(self, name, choices):
self.name = name
self.choices = choices
self.bounds = [0, len(choices) - 1e-9]
def to_user_space(self, x):
return self.choices[int(x)]
def to_search_space(self, v):
return self.choices.index(v)
def torch_round(self, x):
return x.floor()
def round_individual_parameters(parameters, X):
X = X.clone()
for i, p in enumerate(parameters):
X[..., i] = p.torch_round(X[..., i])
return X
class LinearConstraint:
"""
Encodes w * x <= bound, using an indices-and-coefficients description of w
"""
def __init__(self, indices, coefficients, bound):
self.indices = indices
self.coefficients = coefficients
self.bound = bound
def check(self, x):
return (sum(c * v
for c, v in zip(self.coefficients, x[self.indices]))
<= self.bound)
def as_botorch_inequality_constraint(self):
return (torch.tensor(self.indices, device=DEVICE),
-torch.tensor(self.coefficients, dtype=DTYPE, device=DEVICE),
-self.bound)
def botorch_inequality_constraints(linear_constraints):
return [c.as_botorch_inequality_constraint() for c in linear_constraints]
def botorch_bounds(parameters):
return torch.tensor([p.bounds for p in parameters],
device=DEVICE, dtype=DTYPE).T
def to_search_domain(parameters, config):
return [p.to_search_space(config[p.name])
for p in parameters]
def to_user_domain(parameters, x):
return {p.name: p.to_user_space(x[i])
for i, p in enumerate(parameters)}
# =======================
# Random search algorithm
# =======================
def unit_hypercube_to_search_space(parameters, X):
X = X.clone()
for i, p in enumerate(parameters):
X[..., i] *= (p.bounds[1] - p.bounds[0])
X[..., i] += p.bounds[0]
return X
def sobol_sample(n, parameters, constraints=(), init_position=0):
engine = torch.quasirandom.SobolEngine(
dimension=len(parameters), scramble=True
).fast_forward(init_position)
samples = []
while len(samples) < n:
sample = engine.draw(1, dtype=torch.double)[0]
sample = unit_hypercube_to_search_space(parameters, sample)
sample = round_individual_parameters(parameters, sample)
if all(c.check(sample) for c in constraints):
samples.append(sample)
return torch.vstack(samples)
# ===================
# Task-specific logic
# ===================
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist
def result_to_y(result):
# Use the log so that all real numbers map to valid losses, so the GP will
# always output meaningful predictions (rather than predicting negative
# losses)
#
# This also makes it so that we don't typically need to configure the GP's
# covariance priors.
return [math.log(result["loss"])]
class MNISTTask:
def __init__(self, point_to_config_f):
(train_images,
train_labels), (test_images,
test_labels) = mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1))
train_images = train_images.astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1))
test_images = test_images.astype('float32') / 255
self.train_images = train_images
self.train_labels = train_labels
self.test_images = test_images
self.test_labels = test_labels
self.point_to_config = point_to_config_f
def evaluate(self, config):
model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation="softmax"))
# print(model.summary())
optimizers = {
"adam": tf.keras.optimizers.Adam,
"sgd": tf.keras.optimizers.SGD,
"rmsprop": tf.keras.optimizers.RMSprop,
}
model.compile(
optimizer=optimizers[config["optimizer"]](
learning_rate=config["learning_rate"]
),
loss="sparse_categorical_crossentropy",
metrics=['accuracy'])
step_size = config["lr_schedule_step_size"]
alpha = config["lr_schedule_alpha"]
def scheduler(epoch, lr):
prev_stage = (epoch - 1) // step_size
stage = epoch // step_size
if stage > prev_stage and stage > 0:
return lr * alpha
return lr
model.fit(self.train_images, self.train_labels,
epochs=config["epochs"],
batch_size=config["batch_size"],
callbacks=[
tf.keras.callbacks.LearningRateScheduler(scheduler)
],
# verbose=0
)
test_loss, test_acc = model.evaluate(self.test_images, self.test_labels)
return {"accuracy": test_acc,
"loss": test_loss}
def optimized_f(self, X):
Y = []
for i, x in enumerate(X):
config = self.point_to_config(X[i].tolist())
print(config)
result = self.evaluate(config)
print(result)
Y.append(result_to_y(result))
return torch.tensor(Y)
# =======================
# Connect it all together
# =======================
from functools import partial
max_num_epochs = 60
scalar_parameters = [
LogInt("epochs", 2, max_num_epochs),
LogInt("lr_schedule_step_size", 1, max_num_epochs),
LogInt("batch_size", 16, 4096),
LogScalar("learning_rate", 1e-6, 0.4),
LogScalar("lr_schedule_alpha", 0.05, 0.98),
]
choice_parameters = [
Choice("optimizer", ["adam", "sgd", "rmsprop"]),
]
parameters = scalar_parameters + choice_parameters
constraints = [
# log(lr_schedule_step_size) <= log(epochs)
LinearConstraint([0, 1], [-1., 1.], 0.)
]
class GP(botorch.models.SingleTaskGP):
"""
Similar to gpytorch.models.MixedSingleTaskGP, but takes integer choices
as input.
"""
def __init__(self, train_X, train_Y, **kwargs):
# Precompute offsets for converting between list of choice integers and
# indices in an n-hot vector.
offsets = []
tot = 0
for i, p in enumerate(choice_parameters):
offsets.append(tot)
tot += len(p.choices)
self.choice_offsets = torch.tensor(
offsets, device=train_X.device).unsqueeze(0)
self.choice_tot = tot
ord_dims = list(range(len(scalar_parameters)))
cat_dims = list(range(len(scalar_parameters),
len(scalar_parameters) + self.choice_tot))
# Logic that is needed if you decide to use multiple outputs (you can't
# simply hardcode the number of outputs to the known number, because
# fit_gpytorch_model breaks this into n single-output models; so this
# code can't just assume it's a n-output model)
_, aug_batch_shape = self.get_batch_dimensions(train_X, train_Y)
sum_kernel = gpytorch.kernels.ScaleKernel(
gpytorch.kernels.MaternKernel(
nu=2.5,
batch_shape=aug_batch_shape,
ard_num_dims=len(ord_dims),
active_dims=ord_dims,
lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-04),
)
+ gpytorch.kernels.ScaleKernel(
botorch.models.kernels.CategoricalKernel(
batch_shape=aug_batch_shape,
ard_num_dims=len(cat_dims),
active_dims=cat_dims,
lengthscale_constraint=gpytorch.constraints.GreaterThan(
1e-06),
)
)
)
prod_kernel = gpytorch.kernels.ScaleKernel(
gpytorch.kernels.MaternKernel(
nu=2.5,
batch_shape=aug_batch_shape,
ard_num_dims=len(ord_dims),
active_dims=ord_dims,
lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-04),
)
* botorch.models.kernels.CategoricalKernel(
batch_shape=aug_batch_shape,
ard_num_dims=len(cat_dims),
active_dims=cat_dims,
lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-06),
)
)
covar_module = sum_kernel + prod_kernel
super().__init__(train_X, train_Y, covar_module=covar_module, **kwargs)
def forward(self, x):
# Ideally the forward method would receive (scalar_x, choice_x) already
# split apart. botorch's models don't support this, though gpytorch
# does.
scalar_x, choice_x = torch.split(x, len(scalar_parameters), -1)
# Convert choice ints to n-hot vector
on_indices = choice_x.long() + self.choice_offsets
choice_x_hot = torch.zeros((*choice_x.shape[:-1], self.choice_tot),
dtype=bool,
device=choice_x.device)
choice_x_hot.scatter_(-1, on_indices, True)
x = torch.cat((scalar_x, choice_x_hot), dim=-1)
return super().forward(x)
if __name__ == "__main__":
to_user_domain_f = partial(to_user_domain, parameters)
task = MNISTTask(to_user_domain_f)
# Get initial observations via a random search. Often it makes sense to
# replace this step with a set of manually-specified suggestions (above)
X = sobol_sample(N_RANDOM, parameters, constraints).to(device=DEVICE)
Y = task.optimized_f(X)
# Then optimize
rounding_function = partial(round_individual_parameters, scalar_parameters)
X, Y, model = optimize(
task.optimized_f, X, Y,
botorch_bounds(scalar_parameters),
[len(p.choices) for p in choice_parameters],
botorch_inequality_constraints(constraints),
rounding_function,
GP)