<- Outer Loop Cookbook: All recipes

Tick-Tock Bayesian Optimization

This is a hyperparameter search algorithm that optimizes both model performance and cost. You specify a desired maximum cost, and the algorithm uses a “tick-tock” approach to find the best hyperparameters that satisfies that constraint.

Each dot indicates the outcome of an experiment for a particular set of hyperparameters. As usual with Bayesian Optimization, these results are fed into a Bayesian regression model, which is used to predict the outcome for other candidate configurations, shown in red. These candidate configurations are then tested, and this process is repeated.

The algorithm alternates between optimizing training time (“tick”) and optimizing model loss (“tock”). On each “tick” stage the algorithm searches for configurations that improve cost without hurting performance. On each “tock” stage it searches for configurations that improve model performance while satisfying the maximum training time.

In practice, this leads to one of two end results:

  1. Scenario 1: Maximum model performance is reachable within budget. In this case, the loop will keep exploring downward, searching for a model and training regime that is maximally cheap while still achieving maximal model performance.
  2. Scenario 2: Maximum model performance is beyond your budget (think: large AI models). In this case, each “tick” creates some potential energy for improvement, which is then consumed by the “tock”. The loop searches for the best model that fits your budget.

You are free to wrap this search in yet another outer loop, where you increase or decrease the budget over time.

It is noteworthy that optimizing for this second objective can actually decrease your total cost. If you dedicate \(n\) percent of your experiments to optimizing cost, the whole thing pays for itself as soon as it reduces cost by \(n\) percent. Usually, adding a second objective to your hyperparameter optimization makes it cost more, but not here.

Python code

# =====================
# The optimization code
# =====================

import math

import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch


DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.double

# Botorch heuristics configuration
NUM_RESTARTS = 60
RAW_SAMPLES = 1024

N_RANDOM = 10
# When using joint optimization and batching, botorch seems to perform poorly on
# this task. Often one item in the batch is good and the rest are throwaway
# experiments with ~zero probability of even meeting the constraints. So we
# disable batching here. We could also enable batching and change optimize_acqf
# to use sequential=True if we want to run experiments in parallel.
# BATCH_SIZE = 3
# N_BATCH = 20
BATCH_SIZE = 1
N_BATCH = 60
MC_SAMPLES = 256

MAX_TRAINING_TIME = 60

def loss_objective_constructor(model, X, Y):
    return botorch.acquisition.ConstrainedMCObjective(
        objective=lambda Z: -Z[..., 0],
        constraints=[lambda Z: Z[..., 1] - math.log(MAX_TRAINING_TIME)],
        eta=(math.log(MAX_TRAINING_TIME) / 1000),
        infeasible_cost=Y[..., 0].max() + 2.0
    )


def cost_objective_constructor(model, X, Y):
    # Compute MC_SAMPLES different loss thresholds, one for each sample.
    # This cost_objective expects to be called on a MC_SAMPLES of
    # each batch item.
    with torch.no_grad():
        baseline_posterior = model.posterior(X)
    sampler = botorch.sampling.IIDNormalSampler(num_samples=MC_SAMPLES)
    baseline_samples = sampler(baseline_posterior)

    # Use the samples to decide on a best baseline configuration.
    loss_objective = loss_objective_constructor(model, X, Y)
    best_baseline = loss_objective(baseline_samples).max(
        dim=-1, keepdim=True).indices
    sampled_best_loss_within_budget = baseline_samples[..., 0].gather(
        -1, best_baseline
    )

    return botorch.acquisition.ConstrainedMCObjective(
        objective=lambda Z: -Z[..., 1],
        constraints=[
            lambda Z: (Z[..., 0]
                       - sampled_best_loss_within_budget.view(
                           [-1] + [1] * (len(Z.shape) - 2))
                       ),
            # It is important to also include this constraint so that the
            # baseline "best training time" inside the qNoisyExpectedImprovement
            # doesn't get dominated by configurations with large training times
            # (either because they have low loss or just high variance in loss).
            # Otherwise it would be possible for candidates with long training
            # times to have a large expected improvement.
            lambda Z: Z[..., 1] - math.log(MAX_TRAINING_TIME)
        ],
        eta=(1 / 1000),
        infeasible_cost=Y[..., 1].max() + 2.0,
    )


def _optimize(f, X, Y, objective_constructor, bounds, inequality_constraints,
              state_dict):
    model = botorch.models.SingleTaskGP(X, Y).to(X)
    if state_dict is not None:
        # use the current state dict to speed up fitting
        model.load_state_dict(state_dict)
    botorch.fit_gpytorch_model(
        gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood, model)
    )

    candidates, acq_values = botorch.optim.optimize_acqf(
        acq_function=botorch.acquisition.qNoisyExpectedImprovement(
            model=model,
            X_baseline=X,
            sampler=botorch.sampling.SobolQMCNormalSampler(
                num_samples=MC_SAMPLES),
            objective=objective_constructor(model, X, Y),
        ),
        bounds=bounds,
        inequality_constraints=inequality_constraints,
        q=BATCH_SIZE,
        num_restarts=NUM_RESTARTS,
        raw_samples=RAW_SAMPLES,
    )
    X_new = candidates.detach()
    Y_new = f(X_new)
    X = torch.cat([X, X_new])
    Y = torch.cat([Y, Y_new])
    return X, Y, model


def tick_tock_optimize(f, X, Y, bounds, inequality_constraints):
    """
    f must take in a batch of points on a hypercube (defined by bounds) and
    return two values: an objective, which is maximized, and a training time,
    which is minimized
    """
    state_dict = None
    model = None

    for iteration in range(N_BATCH):
        # Phase 1: Optimize training time, creating more potential for better
        # loss
        X, Y, model = _optimize(f, X, Y,
                                cost_objective_constructor,
                                bounds, inequality_constraints, state_dict)
        state_dict = model.state_dict()
        # Phase 2: Optimize loss, now that some training time has been freed
        # up.
        X, Y, model = _optimize(f, X, Y,
                                loss_objective_constructor,
                                bounds, inequality_constraints, state_dict)
        state_dict = model.state_dict()

        print(f"Batch {(iteration + 1):>2}")

    return X, Y, model


# =====================================================
# SearchSpace utilities
# (Ideally Ax would expose this functionality directly)
# =====================================================

import ax
import ax.core
import ax.modelbridge.registry
from ax.models.torch.utils import _to_inequality_constraints
from ax.modelbridge.modelbridge_utils import extract_parameter_constraints


def ax_bounds(search_space):
    return [(p.lower, p.upper)
            for p in search_space.parameters.values()]


def botorch_bounds(search_space):
    return torch.tensor(ax_bounds(search_space),
                        device=DEVICE, dtype=DTYPE).T


def ax_linear_constraints(search_space):
    return extract_parameter_constraints(search_space.parameter_constraints,
                                         list(search_space.parameters.keys()))


def botorch_inequality_constraints(search_space):
    return _to_inequality_constraints(
        tuple(torch.tensor(arr, device=DEVICE, dtype=DTYPE)
              for arr in ax_linear_constraints(search_space))
    )


class SearchSpaceTransformation:
    def __init__(self, search_space, transforms=ax.modelbridge.registry.Cont_X_trans):
        """
        transforms: Defaults to
        [RemoveFixed, OrderedChoiceEncode, OneHot, IntToFloat, Log, Logit, UnitX]
        """
        self.before = search_space
        transform_instances = []
        after = search_space.clone()
        for transform_class in transforms:
            t = transform_class(search_space=after,
                                observation_features=[],
                                observation_data=[])
            after = t.transform_search_space(after)
            transform_instances.append(t)
        self.transform_instances = transform_instances
        self.after = after

    def point_to_config(self, x):
        observation_features = ax.core.ObservationFeatures(
            dict(zip(self.after.parameters.keys(), x)))
        for t in reversed(self.transform_instances):
            [observation_features] = t.untransform_observation_features(
                [observation_features]
            )

        return observation_features.parameters

    def config_to_point(self, config):
        observation_features = ax.core.ObservationFeatures(config)
        for t in self.transform_instances:
            [observation_features] = t.transform_observation_features(
                [observation_features]
            )

        return list(observation_features.parameters.values())




# ===================
# Task-specific logic
# ===================

import time

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist


def result_to_y(result):
    # Use the log so that all real numbers map to valid losses and times, so the
    # GP will always output meaningful predictions (rather than predicting
    # negative losses or training times)
    #
    # This also makes it so that we don't typically need to configure the GP's
    # covariance priors.
    return [math.log(result["loss"]),
            math.log(result["training_time"])]


class MNISTTask:
    def __init__(self, point_to_config_f):
        (train_images, train_labels), (test_images, test_labels) = mnist.load_data()
        train_images = train_images.reshape((60000, 28, 28, 1))
        train_images = train_images.astype('float32') / 255
        test_images = test_images.reshape((10000, 28, 28, 1))
        test_images = test_images.astype('float32') / 255
        self.train_images = train_images
        self.train_labels = train_labels
        self.test_images = test_images
        self.test_labels = test_labels

        self.point_to_config = point_to_config_f

    def evaluate(self, config):
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10, activation="softmax"))
        # print(model.summary())

        model.compile(
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=config["learning_rate"]
            ),
            loss="sparse_categorical_crossentropy",
            metrics=['accuracy'])

        step_size = config["lr_schedule_step_size"]
        alpha = config["lr_schedule_alpha"]

        def scheduler(epoch, lr):
            prev_stage = (epoch - 1) // step_size
            stage = epoch // step_size
            if stage > prev_stage and stage > 0:
                return lr * alpha
            return lr

        tstart = time.time()

        model.fit(self.train_images, self.train_labels,
                  epochs=config["epochs"],
                  batch_size=config["batch_size"],
                  callbacks=[tf.keras.callbacks.LearningRateScheduler(scheduler)],
                  # verbose=0
                  )

        training_time = time.time() - tstart

        test_loss, test_acc = model.evaluate(self.test_images, self.test_labels)

        return {"accuracy": test_acc,
                "loss": test_loss,
                "training_time": training_time}

    def tick_tock_f(self, X):
        Y = []
        for i, x in enumerate(X):
            config = self.point_to_config(X[i].tolist())
            print(config)
            result = self.evaluate(config)
            print(result)
            Y.append(result_to_y(result))
        return torch.tensor(Y)


# =======================
# Connect it all together
# =======================

import ax.models.random.sobol
from ax.service.utils.instantiation import InstantiationBase

max_num_epochs = 60
parameters = [
    {"name": "epochs", "type": "range", "bounds": [2, max_num_epochs],
     "value_type": "int"},
    {"name": "batch_size", "type": "range", "bounds": [16, 4096],
     "value_type": "int", "log_scale": True},
    {"name": "learning_rate", "type": "range", "bounds": [1e-6, 0.4],
     "value_type": "float", "log_scale": True},
    {"name": "lr_schedule_step_size", "type": "range", "bounds": [1, max_num_epochs],
     "value_type": "int"},
    {"name": "lr_schedule_alpha", "type": "range", "bounds": [0.05, 0.98],
     "value_type": "float", "log_scale": True},
]

parameter_constraints = [
    "lr_schedule_step_size <= epochs"
]


if __name__ == "__main__":
    search_space = InstantiationBase.make_search_space(
        parameters=parameters,
        parameter_constraints=parameter_constraints
    )
    sst = SearchSpaceTransformation(search_space)

    task = MNISTTask(sst.point_to_config)

    # Get initial observations via a random search. Often it makes sense to
    # replace this step with a set of manually-specified suggestions.
    g = ax.models.random.sobol.SobolGenerator()
    points, _ = g.gen(N_RANDOM, ax_bounds(sst.after),
                      ax_linear_constraints(sst.after))
    X = torch.tensor(points, dtype=DTYPE, device=DEVICE)
    Y = task.tick_tock_f(X)
    # Then optimize
    X, Y, model = tick_tock_optimize(task.tick_tock_f, X, Y,
                                     botorch_bounds(sst.after),
                                     botorch_inequality_constraints(sst.after))

This code demonstrates the core idea. A couple future improvements:

  • Create a followup example that adds the ability for the optimized function to return a partial result so that it can estimate the actual cost of a model without fully training it, stopping early if it’s too high. (This means there will be two GP models, one with more observations than the other.)

(Thanks to Rosanne Liu for early feedback on the visualization.)