<- Outer Loop Cookbook: All recipes

Scalars with linear constraints

Basic example that optimizes a scalar array.

Python code

# =====================
# The optimization code
# =====================

import math
import os

import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch


DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.double

# Botorch heuristics configuration
NUM_RESTARTS = 60
RAW_SAMPLES = 1024

N_RANDOM = 10
# When using joint optimization and batching, botorch seems to perform poorly on
# this task. Often one item in the batch is good and the rest are throwaway
# experiments with ~zero probability of even meeting constraints. So we disable
# batching here. We could also enable batching and change optimize_acqf to use
# sequential=True if we want to run experiments in parallel.
# BATCH_SIZE = 3
# N_BATCH = 20
BATCH_SIZE = 1
N_BATCH = 60
MC_SAMPLES = 256


def optimize(f, X, Y, bounds, inequality_constraints, rounding_function,
             model_class):
    state_dict = None
    model = None

    for iteration in range(N_BATCH):
        model = model_class(X, Y).to(X)
        if state_dict is not None:
            # Use the current state dict to speed up fitting.
            model.load_state_dict(state_dict)
        botorch.fit_gpytorch_model(
            gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood,
                                                     model)
        )

        candidates, acq_values = botorch.optim.optimize_acqf(
            acq_function=botorch.acquisition.qNoisyExpectedImprovement(
                model=model,
                X_baseline=X,
                sampler=botorch.sampling.SobolQMCNormalSampler(
                    num_samples=MC_SAMPLES),
                objective=botorch.acquisition.GenericMCObjective(
                    lambda Z: -Z[..., 0])
            ),
            bounds=bounds,
            q=BATCH_SIZE,
            num_restarts=NUM_RESTARTS,
            raw_samples=RAW_SAMPLES,
            inequality_constraints=inequality_constraints,
            post_processing_func=rounding_function,
        )
        X_new = candidates.detach()
        Y_new = f(X_new)
        X = torch.cat([X, X_new])
        Y = torch.cat([Y, Y_new])
        state_dict = model.state_dict()

        print(f"Batch {(iteration + 1):>2}")

    return X, Y, model


# ==========================================================
# A tiny library for dealing with parameters and constraints
# ==========================================================

class LogScalar:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [math.log(lower), math.log(upper)]

    def to_user_space(self, x):
        return math.exp(x)

    def to_search_space(self, v):
        return math.log(v)

    def torch_round(self, x):
        return x


class LogInt(LogScalar):
    def to_user_space(self, x):
        return round(math.exp(x))

    def torch_round(self, x):
        return x.exp().round().log()


class Int:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [lower, upper]

    def to_user_space(self, x):
        return int(round(x))

    def to_search_space(self, v):
        return float(v)

    def torch_round(self, x):
        return x.round()


def round_individual_parameters(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] = p.torch_round(X[..., i])
    return X


class LinearConstraint:
    """
    Encodes w * x <= bound, using an indices-and-coefficients description of w
    """
    def __init__(self, indices, coefficients, bound):
        self.indices = indices
        self.coefficients = coefficients
        self.bound = bound

    def check(self, x):
        return (sum(c * v
                    for c, v in zip(self.coefficients, x[self.indices]))
                <= self.bound)

    def as_botorch_inequality_constraint(self):
        return (torch.tensor(self.indices, device=DEVICE),
                -torch.tensor(self.coefficients, dtype=DTYPE, device=DEVICE),
                -self.bound)


def botorch_inequality_constraints(linear_constraints):
    return [c.as_botorch_inequality_constraint() for c in linear_constraints]


def botorch_bounds(parameters):
    return torch.tensor([p.bounds for p in parameters],
                        device=DEVICE, dtype=DTYPE).T


def to_search_domain(parameters, config):
    return [p.to_search_space(config[p.name])
            for p in parameters]


def to_user_domain(parameters, x):
    return {p.name: p.to_user_space(x[i])
            for i, p in enumerate(parameters)}


# =======================
# Random search algorithm
# =======================

def unit_hypercube_to_search_space(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] *= (p.bounds[1] - p.bounds[0])
        X[..., i] += p.bounds[0]
    return X


def sobol_sample(n, parameters, constraints=(), init_position=0):
    engine = torch.quasirandom.SobolEngine(
        dimension=len(parameters), scramble=True
    ).fast_forward(init_position)

    samples = []
    while len(samples) < n:
        sample = engine.draw(1, dtype=torch.double)[0]
        sample = unit_hypercube_to_search_space(parameters, sample)
        sample = round_individual_parameters(parameters, sample)
        if all(c.check(sample) for c in constraints):
            samples.append(sample)

    return torch.vstack(samples)



# ===================
# Task-specific logic
# ===================

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist


def result_to_y(result):
    # Use the log so that all real numbers map to valid losses, so the GP will
    # always output meaningful predictions (rather than predicting negative
    # losses)
    #
    # This also makes it so that we don't typically need to configure the GP's
    # covariance priors.
    return [math.log(result["loss"])]


class MNISTTask:
    def __init__(self, point_to_config_f):
        (train_images,
         train_labels), (test_images,
                         test_labels) = mnist.load_data()
        train_images = train_images.reshape((60000, 28, 28, 1))
        train_images = train_images.astype('float32') / 255
        test_images = test_images.reshape((10000, 28, 28, 1))
        test_images = test_images.astype('float32') / 255
        self.train_images = train_images
        self.train_labels = train_labels
        self.test_images = test_images
        self.test_labels = test_labels

        self.point_to_config = point_to_config_f

    def evaluate(self, config):
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu',
                                input_shape=(28, 28, 1)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10, activation="softmax"))
        # print(model.summary())

        model.compile(
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=config["learning_rate"]
            ),
            loss="sparse_categorical_crossentropy",
            metrics=['accuracy'])

        step_size = config["lr_schedule_step_size"]
        alpha = config["lr_schedule_alpha"]

        def scheduler(epoch, lr):
            prev_stage = (epoch - 1) // step_size
            stage = epoch // step_size
            if stage > prev_stage and stage > 0:
                return lr * alpha
            return lr

        model.fit(self.train_images, self.train_labels,
                  epochs=config["epochs"],
                  batch_size=config["batch_size"],
                  callbacks=[
                      tf.keras.callbacks.LearningRateScheduler(scheduler)
                  ],
                  # verbose=0
                  )

        test_loss, test_acc = model.evaluate(self.test_images, self.test_labels)

        return {"accuracy": test_acc,
                "loss": test_loss}

    def optimized_f(self, X):
        Y = []
        for i, x in enumerate(X):
            config = self.point_to_config(X[i].tolist())
            print(config)
            result = self.evaluate(config)
            print(result)
            Y.append(result_to_y(result))
        return torch.tensor(Y)


# =======================
# Connect it all together
# =======================

from functools import partial

max_num_epochs = 60
parameters = [
    LogInt("epochs", 2, max_num_epochs),
    LogInt("lr_schedule_step_size", 1, max_num_epochs),
    LogInt("batch_size", 16, 4096),
    LogScalar("learning_rate", 1e-6, 0.4),
    LogScalar("lr_schedule_alpha", 0.05, 0.98),
]

constraints = [
    # log(lr_schedule_step_size) <= log(epochs)
    LinearConstraint([0, 1], [-1., 1.], 0.)
]


if __name__ == "__main__":
    to_user_domain_f = partial(to_user_domain, parameters)
    task = MNISTTask(to_user_domain_f)

    # Get initial observations via a random search. Often it makes sense to
    # replace this step with a set of manually-specified suggestions
    X = sobol_sample(N_RANDOM, parameters, constraints).to(device=DEVICE)
    Y = task.optimized_f(X)

    # Then optimize
    rounding_function = partial(round_individual_parameters, parameters)
    X, Y, model = optimize(
        task.optimized_f, X, Y,
        botorch_bounds(parameters),
        botorch_inequality_constraints(constraints),
        rounding_function,
        botorch.models.SingleTaskGP)