Tick-Tock Bayesian Optimization

This is a hyperparameter search algorithm that optimizes both model performance and cost. You specify a desired maximum cost, and the algorithm uses a “tick-tock” approach to find the best hyperparameters that satisfies that constraint.

Each dot indicates the outcome of an experiment for a particular set of hyperparameters. As usual with Bayesian Optimization, these results are fed into a Bayesian regression model, which is used to predict the outcome for other candidate configurations, shown in red. These candidate configurations are then tested, and this process is repeated.

The algorithm alternates between optimizing training time (“tick”) and optimizing model loss (“tock”). On each “tick” stage the algorithm searches for configurations that improve cost without hurting performance. On each “tock” stage it searches for configurations that improve model performance while satisfying the maximum training time.

In practice, this leads to one of two end results:

  1. Scenario 1: Maximum model performance is reachable within budget. In this case, the loop will keep exploring downward, searching for a model and training regime that is maximally cheap while still achieving maximal model performance.
  2. Scenario 2: Maximum model performance is beyond your budget (think: large AI models). In this case, each “tick” creates some potential energy for improvement, which is then consumed by the “tock”. The loop searches for the best model that fits your budget.

You are free to wrap this search in yet another outer loop, where you increase or decrease the budget over time.

It is noteworthy that optimizing for this second objective can actually decrease your total cost. If you dedicate \(n\) percent of your experiments to optimizing cost, the whole thing pays for itself as soon as it reduces cost by \(n\) percent. Usually, adding a second objective to your hyperparameter optimization makes it cost more, but not here.

Python code

# =====================
# The optimization code
# =====================

import math

import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.double

# Botorch heuristics configuration

# When using joint optimization and batching, botorch seems to perform poorly on
# this task. Often one item in the batch is good and the rest are throwaway
# experiments with ~zero probability of even meeting the constraints. So we
# disable batching here. We could also enable batching and change optimize_acqf
# to use sequential=True if we want to run experiments in parallel.
# N_BATCH = 20
N_BATCH = 60


def loss_objective_constructor(model, X, Y):
    return botorch.acquisition.ConstrainedMCObjective(
        objective=lambda Z: -Z[..., 0],
        constraints=[lambda Z: Z[..., 1] - math.log(MAX_TRAINING_TIME)],
        eta=(math.log(MAX_TRAINING_TIME) / 1000),
        infeasible_cost=Y[..., 0].max() + 2.0

def cost_objective_constructor(model, X, Y):
    # Compute MC_SAMPLES different loss thresholds, one for each sample.
    # This cost_objective expects to be called on a MC_SAMPLES of
    # each batch item.
    with torch.no_grad():
        baseline_posterior = model.posterior(X)
    sampler = botorch.sampling.IIDNormalSampler(num_samples=MC_SAMPLES)
    baseline_samples = sampler(baseline_posterior)

    # Use the samples to decide on a best baseline configuration.
    loss_objective = loss_objective_constructor(model, X, Y)
    best_baseline = loss_objective(baseline_samples).max(
        dim=-1, keepdim=True).indices
    sampled_best_loss_within_budget = baseline_samples[..., 0].gather(
        -1, best_baseline

    return botorch.acquisition.ConstrainedMCObjective(
        objective=lambda Z: -Z[..., 1],
            lambda Z: (Z[..., 0]
                       - sampled_best_loss_within_budget.view(
                           [-1] + [1] * (len(Z.shape) - 2))
            # It is important to also include this constraint so that the
            # baseline "best training time" inside the qNoisyExpectedImprovement
            # doesn't get dominated by configurations with large training times
            # (either because they have low loss or just high variance in loss).
            # Otherwise it would be possible for candidates with long training
            # times to have a large expected improvement.
            lambda Z: Z[..., 1] - math.log(MAX_TRAINING_TIME)
        eta=(1 / 1000),
        infeasible_cost=Y[..., 1].max() + 2.0,

def tick_tock_optimize(f, X, Y, bounds, inequality_constraints,
                       rounding_function, model_class):
    f must take in a batch of points on a hypercube (defined by bounds) and
    return two values: an objective, which is maximized, and a training time,
    which is minimized
    state_dict = None
    model = None

    for iteration in range(N_BATCH):
        for command, objective_constructor in [
                # Phase 1: Optimize training time, creating more potential for
                # better loss
                ("cost", cost_objective_constructor),
                # Phase 2: Optimize loss, now that some training time has been
                # freed up.
                ("loss", loss_objective_constructor)]:
            model = model_class(X, Y).to(X)
            if state_dict is not None:
                # Use the current state dict to speed up fitting.

            candidates, acq_values = botorch.optim.optimize_acqf(
                    objective=objective_constructor(model, X, Y),
            X_new = candidates.detach()
            Y_new = f(X_new)
            X = torch.cat([X, X_new])
            Y = torch.cat([Y, Y_new])
            state_dict = model.state_dict()

        print(f"Batch {(iteration + 1):>2}")

    return X, Y, model

# ==========================================================
# A tiny library for dealing with parameters and constraints
# ==========================================================

class LogScalar:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [math.log(lower), math.log(upper)]

    def to_user_space(self, x):
        return math.exp(x)

    def to_search_space(self, v):
        return math.log(v)

    def torch_round(self, x):
        return x

class LogInt(LogScalar):
    def to_user_space(self, x):
        return round(math.exp(x))

    def torch_round(self, x):
        return x.exp().round().log()

class Int:
    def __init__(self, name, lower, upper):
        self.name = name
        self.bounds = [lower, upper]

    def to_user_space(self, x):
        return int(round(x))

    def to_search_space(self, v):
        return float(v)

    def torch_round(self, x):
        return x.round()

def round_individual_parameters(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] = p.torch_round(X[..., i])
    return X

class LinearConstraint:
    Encodes w * x <= bound, using an indices-and-coefficients description of w
    def __init__(self, indices, coefficients, bound):
        self.indices = indices
        self.coefficients = coefficients
        self.bound = bound

    def check(self, x):
        return (sum(c * v
                    for c, v in zip(self.coefficients, x[self.indices]))
                <= self.bound)

    def as_botorch_inequality_constraint(self):
        return (torch.tensor(self.indices, device=DEVICE),
                -torch.tensor(self.coefficients, dtype=DTYPE, device=DEVICE),

def botorch_inequality_constraints(linear_constraints):
    return [c.as_botorch_inequality_constraint() for c in linear_constraints]

def botorch_bounds(parameters):
    return torch.tensor([p.bounds for p in parameters],
                        device=DEVICE, dtype=DTYPE).T

def to_search_domain(parameters, config):
    return [p.to_search_space(config[p.name])
            for p in parameters]

def to_user_domain(parameters, x):
    return {p.name: p.to_user_space(x[i])
            for i, p in enumerate(parameters)}

# =======================
# Random search algorithm
# =======================

def unit_hypercube_to_search_space(parameters, X):
    X = X.clone()
    for i, p in enumerate(parameters):
        X[..., i] *= (p.bounds[1] - p.bounds[0])
        X[..., i] += p.bounds[0]
    return X

def sobol_sample(n, parameters, constraints=(), init_position=0):
    engine = torch.quasirandom.SobolEngine(
        dimension=len(parameters), scramble=True

    samples = []
    while len(samples) < n:
        sample = engine.draw(1, dtype=torch.double)[0]
        sample = unit_hypercube_to_search_space(parameters, sample)
        sample = round_individual_parameters(parameters, sample)
        if all(c.check(sample) for c in constraints):

    return torch.vstack(samples)

# ===================
# Task-specific logic
# ===================

import time

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist

def result_to_y(result):
    # Use the log so that all real numbers map to valid losses and times, so the
    # GP will always output meaningful predictions (rather than predicting
    # negative losses or training times)
    # This also makes it so that we don't typically need to configure the GP's
    # covariance priors.
    return [math.log(result["loss"]),

class MNISTTask:
    def __init__(self, point_to_config_f):
         train_labels), (test_images,
                         test_labels) = mnist.load_data()
        train_images = train_images.reshape((60000, 28, 28, 1))
        train_images = train_images.astype('float32') / 255
        test_images = test_images.reshape((10000, 28, 28, 1))
        test_images = test_images.astype('float32') / 255
        self.train_images = train_images
        self.train_labels = train_labels
        self.test_images = test_images
        self.test_labels = test_labels

        self.point_to_config = point_to_config_f

    def evaluate(self, config):
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu',
                                input_shape=(28, 28, 1)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10, activation="softmax"))
        # print(model.summary())


        step_size = config["lr_schedule_step_size"]
        alpha = config["lr_schedule_alpha"]

        def scheduler(epoch, lr):
            prev_stage = (epoch - 1) // step_size
            stage = epoch // step_size
            if stage > prev_stage and stage > 0:
                return lr * alpha
            return lr

        tstart = time.time()

        model.fit(self.train_images, self.train_labels,
                  # verbose=0

        training_time = time.time() - tstart

        test_loss, test_acc = model.evaluate(self.test_images, self.test_labels)

        return {"accuracy": test_acc,
                "loss": test_loss,
                "training_time": training_time}

    def tick_tock_f(self, X):
        Y = []
        for i, x in enumerate(X):
            config = self.point_to_config(X[i].tolist())
            result = self.evaluate(config)

        return torch.tensor(Y)

# =======================
# Connect it all together
# =======================

from functools import partial

max_num_epochs = 60
parameters = [
    LogInt("epochs", 2, max_num_epochs),
    LogInt("lr_schedule_step_size", 1, max_num_epochs),
    LogInt("batch_size", 16, 4096),
    LogScalar("learning_rate", 1e-6, 0.4),
    LogScalar("lr_schedule_alpha", 0.05, 0.98),

constraints = [
    # log(lr_schedule_step_size) <= log(epochs)
    LinearConstraint([0, 1], [-1., 1.], 0.)

if __name__ == "__main__":
    to_user_domain_f = partial(to_user_domain, parameters)
    task = MNISTTask(to_user_domain_f)

    # Get initial observations via a random search. Often it makes sense to
    # replace this step with a set of manually-specified suggestions
    X = sobol_sample(N_RANDOM, parameters, constraints).to(device=DEVICE)
    Y = task.tick_tock_f(X)

    # Then optimize
    rounding_function = partial(round_individual_parameters, parameters)
    X, Y, model = tick_tock_optimize(
        task.tick_tock_f, X, Y,

This code demonstrates the core idea. A couple future improvements:

  • Create a followup example that adds the ability for the optimized function to return a partial result so that it can estimate the actual cost of a model without fully training it, stopping early if it’s too high. (This means there will be two GP models, one with more observations than the other.)

(Thanks to Rosanne Liu for early feedback on the visualization.)