<- Outer Loop Cookbook: All recipes

Categorical choice parameters

Basic example that optimizes a scalar array and a set of choices.

Python code

# =====================
# The optimization code
# =====================

import itertools
import math
import os

import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch


DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.double

# Botorch heuristics configuration
NUM_RESTARTS = 60
RAW_SAMPLES = 1024

N_RANDOM = 10
N_BATCH = 60
MC_SAMPLES = 256


def optimize_acqf_brute_force_choices(
        acq_function, scalar_bounds, choices_shape, inequality_constraints,
        post_processing_func):
    """
    Similar to botorch's optimize_acqf_mixed, but implemented differently. (No
    "fixed_features" logic, instead just pass the choice parameters into f
    separately from the scalar parameters)
    """
    all_results = []
    for x_choices in itertools.product(*[range(num_choices)
                                         for num_choices in choices_shape]):
        x_choices = torch.tensor(x_choices, device=X.device)

        def wrapped_acq_function(x):
            return acq_function(x, x_choices)

        candidates, acq_values = botorch.optim.optimize_acqf(
            acq_function=wrapped_acq_function,
            bounds=scalar_bounds,
            q=1,
            num_restarts=NUM_RESTARTS,
            raw_samples=RAW_SAMPLES,
            inequality_constraints=inequality_constraints,
            post_processing_func=post_processing_func,
        )

        x = candidates.detach()
        all_results.append(((x, x_choices), acq_values))

    ((best_x,
      best_x_choices),
     acq_values) = max(all_results, key=lambda x: x[1])

    return (best_x, best_x_choices), acq_values


def optimize(f, X, Y, bounds, choices_shape, inequality_constraints,
             rounding_function, model_class):
    state_dict = None
    model = None

    for iteration in range(N_BATCH):
        model = model_class(X, Y).to(X)
        if state_dict is not None:
            # Use the current state dict to speed up fitting.
            model.load_state_dict(state_dict)
        botorch.fit_gpytorch_model(
            gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood,
                                                     model)
        )

        qNEI = botorch.acquisition.qNoisyExpectedImprovement(
            model=model,
            X_baseline=X,
            sampler=botorch.sampling.SobolQMCNormalSampler(
                num_samples=MC_SAMPLES),
            objective=botorch.acquisition.GenericMCObjective(
                lambda Z: -Z[..., 0])
        )

        def concat_then_qNEI(x_scalars, x_choices):
            # Ideally we would never concatenate these, but botorch models don't
            # support multiple inputs. (gpytorch allows train_X to be a tuple
            # but botorch doesn't)
            x = torch.cat(
                (x_scalars,
                 x_choices.unsqueeze(0).expand((x_scalars.shape[0], 1, -1))),
                dim=-1)
            return qNEI(x)

        (x_scalars,
         x_choices), _ = optimize_acqf_brute_force_choices(
             concat_then_qNEI, bounds, choices_shape,
             inequality_constraints, rounding_function)

        X_new = torch.cat((x_scalars,
                           x_choices.expand((x_scalars.shape[0], -1))),
                          dim=-1)
        Y_new = f(X_new)
        X = torch.cat([X, X_new])
        Y = torch.cat([Y, Y_new])
        state_dict = model.state_dict()

        print(f"Batch {(iteration + 1):>2}")

    return X, Y, model


# ==========================================================
# A tiny library for dealing with parameters and constraints
# ==========================================================

class LogScalar:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [math.log(lower), math.log(upper)]

    def to_user_space(self, x):
        return math.exp(x)

    def to_search_space(self, v):
        return math.log(v)

    def torch_round(self, x):
        return x


class LogInt(LogScalar):
    def to_user_space(self, x):
        return round(math.exp(x))

    def torch_round(self, x):
        return x.exp().round().log()


class Int:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [lower, upper]

    def to_user_space(self, x):
        return int(round(x))

    def to_search_space(self, v):
        return float(v)

    def torch_round(self, x):
        return x.round()


class Choice(Int):
    def __init__(self, name, choices):
        self.name = name
        self.choices = choices
        self.bounds = [0, len(choices) - 1e-9]

    def to_user_space(self, x):
        return self.choices[int(x)]

    def to_search_space(self, v):
        return self.choices.index(v)

    def torch_round(self, x):
        return x.floor()


def round_individual_parameters(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] = p.torch_round(X[..., i])
    return X


class LinearConstraint:
    """
    Encodes w * x <= bound, using an indices-and-coefficients description of w
    """
    def __init__(self, indices, coefficients, bound):
        self.indices = indices
        self.coefficients = coefficients
        self.bound = bound

    def check(self, x):
        return (sum(c * v
                    for c, v in zip(self.coefficients, x[self.indices]))
                <= self.bound)

    def as_botorch_inequality_constraint(self):
        return (torch.tensor(self.indices, device=DEVICE),
                -torch.tensor(self.coefficients, dtype=DTYPE, device=DEVICE),
                -self.bound)


def botorch_inequality_constraints(linear_constraints):
    return [c.as_botorch_inequality_constraint() for c in linear_constraints]


def botorch_bounds(parameters):
    return torch.tensor([p.bounds for p in parameters],
                        device=DEVICE, dtype=DTYPE).T


def to_search_domain(parameters, config):
    return [p.to_search_space(config[p.name])
            for p in parameters]


def to_user_domain(parameters, x):
    return {p.name: p.to_user_space(x[i])
            for i, p in enumerate(parameters)}


# =======================
# Random search algorithm
# =======================

def unit_hypercube_to_search_space(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] *= (p.bounds[1] - p.bounds[0])
        X[..., i] += p.bounds[0]
    return X


def sobol_sample(n, parameters, constraints=(), init_position=0):
    engine = torch.quasirandom.SobolEngine(
        dimension=len(parameters), scramble=True
    ).fast_forward(init_position)

    samples = []
    while len(samples) < n:
        sample = engine.draw(1, dtype=torch.double)[0]
        sample = unit_hypercube_to_search_space(parameters, sample)
        sample = round_individual_parameters(parameters, sample)
        if all(c.check(sample) for c in constraints):
            samples.append(sample)

    return torch.vstack(samples)



# ===================
# Task-specific logic
# ===================

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist


def result_to_y(result):
    # Use the log so that all real numbers map to valid losses, so the GP will
    # always output meaningful predictions (rather than predicting negative
    # losses)
    #
    # This also makes it so that we don't typically need to configure the GP's
    # covariance priors.
    return [math.log(result["loss"])]


class MNISTTask:
    def __init__(self, point_to_config_f):
        (train_images,
         train_labels), (test_images,
                         test_labels) = mnist.load_data()
        train_images = train_images.reshape((60000, 28, 28, 1))
        train_images = train_images.astype('float32') / 255
        test_images = test_images.reshape((10000, 28, 28, 1))
        test_images = test_images.astype('float32') / 255
        self.train_images = train_images
        self.train_labels = train_labels
        self.test_images = test_images
        self.test_labels = test_labels

        self.point_to_config = point_to_config_f

    def evaluate(self, config):
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu',
                                input_shape=(28, 28, 1)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10, activation="softmax"))
        # print(model.summary())

        optimizers = {
            "adam": tf.keras.optimizers.Adam,
            "sgd": tf.keras.optimizers.SGD,
            "rmsprop": tf.keras.optimizers.RMSprop,
        }

        model.compile(
            optimizer=optimizers[config["optimizer"]](
                learning_rate=config["learning_rate"]
            ),
            loss="sparse_categorical_crossentropy",
            metrics=['accuracy'])

        step_size = config["lr_schedule_step_size"]
        alpha = config["lr_schedule_alpha"]

        def scheduler(epoch, lr):
            prev_stage = (epoch - 1) // step_size
            stage = epoch // step_size
            if stage > prev_stage and stage > 0:
                return lr * alpha
            return lr

        model.fit(self.train_images, self.train_labels,
                  epochs=config["epochs"],
                  batch_size=config["batch_size"],
                  callbacks=[
                      tf.keras.callbacks.LearningRateScheduler(scheduler)
                  ],
                  # verbose=0
                  )

        test_loss, test_acc = model.evaluate(self.test_images, self.test_labels)

        return {"accuracy": test_acc,
                "loss": test_loss}

    def optimized_f(self, X):
        Y = []
        for i, x in enumerate(X):
            config = self.point_to_config(X[i].tolist())
            print(config)
            result = self.evaluate(config)
            print(result)
            Y.append(result_to_y(result))
        return torch.tensor(Y)


# =======================
# Connect it all together
# =======================

from functools import partial

max_num_epochs = 60
scalar_parameters = [
    LogInt("epochs", 2, max_num_epochs),
    LogInt("lr_schedule_step_size", 1, max_num_epochs),
    LogInt("batch_size", 16, 4096),
    LogScalar("learning_rate", 1e-6, 0.4),
    LogScalar("lr_schedule_alpha", 0.05, 0.98),
]

choice_parameters = [
    Choice("optimizer", ["adam", "sgd", "rmsprop"]),
]

parameters = scalar_parameters + choice_parameters

constraints = [
    # log(lr_schedule_step_size) <= log(epochs)
    LinearConstraint([0, 1], [-1., 1.], 0.)
]


class GP(botorch.models.SingleTaskGP):
    """
    Similar to gpytorch.models.MixedSingleTaskGP, but takes integer choices
    as input.
    """
    def __init__(self, train_X, train_Y, **kwargs):
        # Precompute offsets for converting between list of choice integers and
        # indices in an n-hot vector.
        offsets = []
        tot = 0
        for i, p in enumerate(choice_parameters):
            offsets.append(tot)
            tot += len(p.choices)
        self.choice_offsets = torch.tensor(
            offsets, device=train_X.device).unsqueeze(0)
        self.choice_tot = tot

        ord_dims = list(range(len(scalar_parameters)))
        cat_dims = list(range(len(scalar_parameters),
                              len(scalar_parameters) + self.choice_tot))
        # Logic that is needed if you decide to use multiple outputs (you can't
        # simply hardcode the number of outputs to the known number, because
        # fit_gpytorch_model breaks this into n single-output models; so this
        # code can't just assume it's a n-output model)
        _, aug_batch_shape = self.get_batch_dimensions(train_X, train_Y)

        sum_kernel = gpytorch.kernels.ScaleKernel(
            gpytorch.kernels.MaternKernel(
                nu=2.5,
                batch_shape=aug_batch_shape,
                ard_num_dims=len(ord_dims),
                active_dims=ord_dims,
                lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-04),
            )
            + gpytorch.kernels.ScaleKernel(
                botorch.models.kernels.CategoricalKernel(
                    batch_shape=aug_batch_shape,
                    ard_num_dims=len(cat_dims),
                    active_dims=cat_dims,
                    lengthscale_constraint=gpytorch.constraints.GreaterThan(
                        1e-06),
                )
            )
        )
        prod_kernel = gpytorch.kernels.ScaleKernel(
            gpytorch.kernels.MaternKernel(
                nu=2.5,
                batch_shape=aug_batch_shape,
                ard_num_dims=len(ord_dims),
                active_dims=ord_dims,
                lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-04),
            )
            * botorch.models.kernels.CategoricalKernel(
                batch_shape=aug_batch_shape,
                ard_num_dims=len(cat_dims),
                active_dims=cat_dims,
                lengthscale_constraint=gpytorch.constraints.GreaterThan(1e-06),
            )
        )

        covar_module = sum_kernel + prod_kernel
        super().__init__(train_X, train_Y, covar_module=covar_module, **kwargs)

    def forward(self, x):
        # Ideally the forward method would receive (scalar_x, choice_x) already
        # split apart. botorch's models don't support this, though gpytorch
        # does.
        scalar_x, choice_x = torch.split(x, len(scalar_parameters), -1)
        # Convert choice ints to n-hot vector
        on_indices = choice_x.long() + self.choice_offsets
        choice_x_hot = torch.zeros((*choice_x.shape[:-1], self.choice_tot),
                                   dtype=bool,
                                   device=choice_x.device)
        choice_x_hot.scatter_(-1, on_indices, True)

        x = torch.cat((scalar_x, choice_x_hot), dim=-1)
        return super().forward(x)



if __name__ == "__main__":
    to_user_domain_f = partial(to_user_domain, parameters)
    task = MNISTTask(to_user_domain_f)

    # Get initial observations via a random search. Often it makes sense to
    # replace this step with a set of manually-specified suggestions (above)
    X = sobol_sample(N_RANDOM, parameters, constraints).to(device=DEVICE)
    Y = task.optimized_f(X)

    # Then optimize
    rounding_function = partial(round_individual_parameters, scalar_parameters)
    X, Y, model = optimize(
        task.optimized_f, X, Y,
        botorch_bounds(scalar_parameters),
        [len(p.choices) for p in choice_parameters],
        botorch_inequality_constraints(constraints),
        rounding_function,
        GP)