# =====================
# The optimization code
# =====================
import math
import os
import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch
DEVICE = torch . device ( "cuda" if torch . cuda . is_available () else "cpu" )
DTYPE = torch . double
SMOKE_TEST = os . environ . get ( "SMOKE_TEST" )
# Botorch heuristics configuration
NUM_RESTARTS = 60 if not SMOKE_TEST else 2
RAW_SAMPLES = 1024 if not SMOKE_TEST else 32
N_RANDOM = 10 if not SMOKE_TEST else 1
# When using joint optimization and batching, botorch seems to perform poorly on
# this task. Often one item in the batch is good and the rest are throwaway
# experiments with ~zero probability of even meeting the constraints. So we
# disable batching here. We could also enable batching and change optimize_acqf
# to use sequential=True if we want to run experiments in parallel.
# BATCH_SIZE = 3 if not SMOKE_TEST else 2
# N_BATCH = 20 if not SMOKE_TEST else 2
BATCH_SIZE = 1
N_BATCH = 60 if not SMOKE_TEST else 2
MC_SAMPLES = 256 if not SMOKE_TEST else 32
MAX_TRAINING_TIME = 60
def loss_objective_constructor ( model , X , Y ):
return botorch . acquisition . ConstrainedMCObjective (
objective = lambda Z : - Z [..., 0 ],
constraints = [ lambda Z : Z [..., 1 ] - math . log ( MAX_TRAINING_TIME )],
eta = ( math . log ( MAX_TRAINING_TIME ) / 1000 ),
infeasible_cost = Y [..., 0 ]. max () + 2.0
)
def cost_objective_constructor ( model , X , Y ):
# Compute MC_SAMPLES different loss thresholds, one for each sample.
# This cost_objective expects to be called on a MC_SAMPLES of
# each batch item.
with torch . no_grad ():
baseline_posterior = model . posterior ( X )
sampler = botorch . sampling . IIDNormalSampler ( num_samples = MC_SAMPLES )
baseline_samples = sampler ( baseline_posterior )
# Use the samples to decide on a best baseline configuration.
loss_objective = loss_objective_constructor ( model , X , Y )
best_baseline = loss_objective ( baseline_samples ). max (
dim =- 1 , keepdim = True ). indices
sampled_best_loss_within_budget = baseline_samples [..., 0 ]. gather (
- 1 , best_baseline
)
return botorch . acquisition . ConstrainedMCObjective (
objective = lambda Z : - Z [..., 1 ],
constraints = [
lambda Z : ( Z [..., 0 ]
- sampled_best_loss_within_budget . view (
[ - 1 ] + [ 1 ] * ( len ( Z . shape ) - 2 ))
),
# It is important to also include this constraint so that the
# baseline "best training time" inside the qNoisyExpectedImprovement
# doesn't get dominated by configurations with large training times
# (either because they have low loss or just high variance in loss).
# Otherwise it would be possible for candidates with long training
# times to have a large expected improvement.
lambda Z : Z [..., 1 ] - math . log ( MAX_TRAINING_TIME )
],
eta = ( 1 / 1000 ),
infeasible_cost = Y [..., 1 ]. max () + 2.0 ,
)
def tick_tock_optimize ( f , X , Y , bounds , inequality_constraints ,
rounding_function , model_class , tracer = None ):
"""
f must take in a batch of points on a hypercube (defined by bounds) and
return two values: an objective, which is maximized, and a training time,
which is minimized
"""
state_dict = None
model = None
for iteration in range ( N_BATCH ):
for command , objective_constructor in [
# Phase 1: Optimize training time, creating more potential for
# better loss
( "cost" , cost_objective_constructor ),
# Phase 2: Optimize loss, now that some training time has been
# freed up.
( "loss" , loss_objective_constructor )]:
if tracer is not None :
tracer . log ( command = command )
model = model_class ( X , Y ). to ( X )
if state_dict is not None :
# Use the current state dict to speed up fitting.
model . load_state_dict ( state_dict )
botorch . fit_gpytorch_model (
gpytorch . mlls . ExactMarginalLogLikelihood ( model . likelihood ,
model )
)
if tracer is not None :
tracer . log_model ( model )
candidates , acq_values = botorch . optim . optimize_acqf (
acq_function = botorch . acquisition . qNoisyExpectedImprovement (
model = model ,
X_baseline = X ,
sampler = botorch . sampling . SobolQMCNormalSampler (
num_samples = MC_SAMPLES ),
objective = objective_constructor ( model , X , Y ),
),
bounds = bounds ,
q = BATCH_SIZE ,
num_restarts = NUM_RESTARTS ,
raw_samples = RAW_SAMPLES ,
inequality_constraints = inequality_constraints ,
post_processing_func = rounding_function ,
)
X_new = candidates . detach ()
if tracer is not None :
with torch . no_grad ():
posterior = model . posterior ( X_new )
tracer . log_predictions ( X_new , posterior . mean ,
posterior . variance )
Y_new = f ( X_new )
if tracer is not None :
tracer . log_results ( Y_new )
X = torch . cat ([ X , X_new ])
Y = torch . cat ([ Y , Y_new ])
state_dict = model . state_dict ()
print ( f "Batch { ( iteration + 1 ) : > 2 } " )
return X , Y , model
# =============
# Tracing logic
# =============
import json
class Tracer :
def __init__ ( self , point_to_config_f , filename , clear = True ):
self . point_to_config_f = point_to_config_f
self . filename = filename
if clear :
open ( filename , "w" ). close ()
def log ( self , command = None , data = None ):
with open ( self . filename , "a" ) as fout :
if command is not None :
print ( command , file = fout )
if data is not None :
print ( json . dumps ( data ), file = fout )
def log_model ( self , model ):
self . log (
data = { "likelihood.noise_covar.noise" :
model . likelihood . noise_covar . noise . tolist (),
"mean_module.constant" :
model . mean_module . constant . tolist (),
"covar_module.outputscale" :
model . covar_module . outputscale . tolist (),
"covar_module.base_kernel.lengthscale" :
model . covar_module . base_kernel . lengthscale . tolist ()}
)
def log_random ( self , X ):
configs = [ self . point_to_config_f ( x . tolist ())
for x in X ]
self . log ( command = "random" , data = configs )
def log_initial_values ( self , configs , results ):
self . log ( command = "initial" , data = [ configs , results ])
def log_predictions ( self , X , mean , variance ):
configs = [ self . point_to_config_f ( x . tolist ())
for x in X ]
self . log ( data = [ configs , mean . tolist (), variance . tolist ()])
def log_results ( self , Y ):
# Don't do anything, instead rely on log_result
pass
def log_result ( self , index , result ):
self . log ( data = [ index , result ])
# ==========================================================
# A tiny library for dealing with parameters and constraints
# ==========================================================
class LogScalar :
def __init__ ( self , name , lower , upper ):
self . name = name
self . bounds = [ math . log ( lower ), math . log ( upper )]
def to_user_space ( self , x ):
return math . exp ( x )
def to_search_space ( self , v ):
return math . log ( v )
def torch_round ( self , x ):
return x
class LogInt ( LogScalar ):
def to_user_space ( self , x ):
return round ( math . exp ( x ))
def torch_round ( self , x ):
return x . exp (). round (). log ()
class Int :
def __init__ ( self , name , lower , upper ):
self . name = name
self . bounds = [ lower , upper ]
def to_user_space ( self , x ):
return int ( round ( x ))
def to_search_space ( self , v ):
return float ( v )
def torch_round ( self , x ):
return x . round ()
def round_individual_parameters ( parameters , X ):
X = X . clone ()
for i , p in enumerate ( parameters ):
X [..., i ] = p . torch_round ( X [..., i ])
return X
class LinearConstraint :
"""
Encodes w * x <= bound, using an indices-and-coefficients description of w
"""
def __init__ ( self , indices , coefficients , bound ):
self . indices = indices
self . coefficients = coefficients
self . bound = bound
def check ( self , x ):
return ( sum ( c * v
for c , v in zip ( self . coefficients , x [ self . indices ]))
<= self . bound )
def as_botorch_inequality_constraint ( self ):
return ( torch . tensor ( self . indices , device = DEVICE ),
- torch . tensor ( self . coefficients , dtype = DTYPE , device = DEVICE ),
- self . bound )
def botorch_inequality_constraints ( linear_constraints ):
return [ c . as_botorch_inequality_constraint () for c in linear_constraints ]
def botorch_bounds ( parameters ):
return torch . tensor ([ p . bounds for p in parameters ],
device = DEVICE , dtype = DTYPE ). T
def to_search_domain ( parameters , config ):
return [ p . to_search_space ( config [ p . name ])
for p in parameters ]
def to_user_domain ( parameters , x ):
return { p . name : p . to_user_space ( x [ i ])
for i , p in enumerate ( parameters )}
# =======================
# Random search algorithm
# =======================
def unit_hypercube_to_search_space ( parameters , X ):
X = X . clone ()
for i , p in enumerate ( parameters ):
X [..., i ] *= ( p . bounds [ 1 ] - p . bounds [ 0 ])
X [..., i ] += p . bounds [ 0 ]
return X
def sobol_sample ( n , parameters , constraints = (), init_position = 0 ):
engine = torch . quasirandom . SobolEngine (
dimension = len ( parameters ), scramble = True
). fast_forward ( init_position )
samples = []
while len ( samples ) < n :
sample = engine . draw ( 1 , dtype = torch . double )[ 0 ]
sample = unit_hypercube_to_search_space ( parameters , sample )
sample = round_individual_parameters ( parameters , sample )
if all ( c . check ( sample ) for c in constraints ):
samples . append ( sample )
return torch . vstack ( samples )
# ===================
# Task-specific logic
# ===================
import time
import tensorflow as tf
from tensorflow.keras import layers , models
from tensorflow.keras.datasets import mnist
def result_to_y ( result ):
# Use the log so that all real numbers map to valid losses and times, so the
# GP will always output meaningful predictions (rather than predicting
# negative losses or training times)
#
# This also makes it so that we don't typically need to configure the GP's
# covariance priors.
return [ math . log ( result [ "loss" ]),
math . log ( result [ "training_time" ])]
class MNISTTask :
def __init__ ( self , point_to_config_f , tracer = None ):
( train_images ,
train_labels ), ( test_images ,
test_labels ) = mnist . load_data ()
train_images = train_images . reshape (( 60000 , 28 , 28 , 1 ))
train_images = train_images . astype ( 'float32' ) / 255
test_images = test_images . reshape (( 10000 , 28 , 28 , 1 ))
test_images = test_images . astype ( 'float32' ) / 255
self . train_images = train_images
self . train_labels = train_labels
self . test_images = test_images
self . test_labels = test_labels
self . point_to_config = point_to_config_f
self . tracer = tracer
def evaluate ( self , config ):
model = models . Sequential ()
model . add ( layers . Conv2D ( 32 , ( 3 , 3 ), activation = 'relu' ,
input_shape = ( 28 , 28 , 1 )))
model . add ( layers . MaxPooling2D (( 2 , 2 )))
model . add ( layers . Conv2D ( 64 , ( 3 , 3 ), activation = 'relu' ))
model . add ( layers . MaxPooling2D (( 2 , 2 )))
model . add ( layers . Conv2D ( 64 , ( 3 , 3 ), activation = 'relu' ))
model . add ( layers . Flatten ())
model . add ( layers . Dense ( 64 , activation = 'relu' ))
model . add ( layers . Dense ( 10 , activation = "softmax" ))
# print(model.summary())
model . compile (
optimizer = tf . keras . optimizers . Adam (
learning_rate = config [ "learning_rate" ]
),
loss = "sparse_categorical_crossentropy" ,
metrics = [ 'accuracy' ])
step_size = config [ "lr_schedule_step_size" ]
alpha = config [ "lr_schedule_alpha" ]
def scheduler ( epoch , lr ):
prev_stage = ( epoch - 1 ) // step_size
stage = epoch // step_size
if stage > prev_stage and stage > 0 :
return lr * alpha
return lr
tstart = time . time ()
model . fit ( self . train_images , self . train_labels ,
epochs = config [ "epochs" ],
batch_size = config [ "batch_size" ],
callbacks = [
tf . keras . callbacks . LearningRateScheduler ( scheduler )
],
# verbose=0
)
training_time = time . time () - tstart
test_loss , test_acc = model . evaluate ( self . test_images , self . test_labels )
return { "accuracy" : test_acc ,
"loss" : test_loss ,
"training_time" : training_time }
def tick_tock_f ( self , X ):
Y = []
for i , x in enumerate ( X ):
config = self . point_to_config ( X [ i ]. tolist ())
print ( config )
result = self . evaluate ( config )
print ( result )
if self . tracer is not None :
self . tracer . log_result ( i , result )
Y . append ( result_to_y ( result ))
return torch . tensor ( Y )
# =======================
# Connect it all together
# =======================
from functools import partial
max_num_epochs = 60 if not SMOKE_TEST else 3
min_batch_size = 16 if not SMOKE_TEST else 256
parameters = [
LogInt ( "epochs" , 2 , max_num_epochs ),
LogInt ( "lr_schedule_step_size" , 1 , max_num_epochs ),
LogInt ( "batch_size" , min_batch_size , 4096 ),
LogScalar ( "learning_rate" , 1e-6 , 0.4 ),
LogScalar ( "lr_schedule_alpha" , 0.05 , 0.98 ),
]
constraints = [
# log(lr_schedule_step_size) <= log(epochs)
LinearConstraint ([ 0 , 1 ], [ - 1. , 1. ], 0. )
]
# Some previous randomly-generated configs and their results
known_configs = [
{ "epochs" : 58 , "batch_size" : 1650 , "learning_rate" : 4.0241610227744396e-05 ,
"lr_schedule_step_size" : 56 , "lr_schedule_alpha" : 0.08748550224528025 },
{ "epochs" : 42 , "batch_size" : 116 , "learning_rate" : 0.1770803853571978 ,
"lr_schedule_step_size" : 23 , "lr_schedule_alpha" : 0.11006912054461343 },
{ "epochs" : 34 , "batch_size" : 392 , "learning_rate" : 0.012227021604924818 ,
"lr_schedule_step_size" : 29 , "lr_schedule_alpha" : 0.1671634509906181 },
{ "epochs" : 11 , "batch_size" : 2930 , "learning_rate" : 0.07922604697796556 ,
"lr_schedule_step_size" : 2 , "lr_schedule_alpha" : 0.3667039198827423 },
{ "epochs" : 49 , "batch_size" : 30 , "learning_rate" : 4.1578447907298945e-06 ,
"lr_schedule_step_size" : 48 , "lr_schedule_alpha" : 0.06314665815905156 },
{ "epochs" : 48 , "batch_size" : 520 , "learning_rate" : 0.019616439244964836 ,
"lr_schedule_step_size" : 39 , "lr_schedule_alpha" : 0.30361084410606004 },
{ "epochs" : 35 , "batch_size" : 40 , "learning_rate" : 0.00014420850956597492 ,
"lr_schedule_step_size" : 5 , "lr_schedule_alpha" : 0.7494470022424606 },
{ "epochs" : 43 , "batch_size" : 2189 , "learning_rate" : 1.6577737963037582e-05 ,
"lr_schedule_step_size" : 12 , "lr_schedule_alpha" : 0.5407511626603911 },
{ "epochs" : 57 , "batch_size" : 153 , "learning_rate" : 0.0015068446519504613 ,
"lr_schedule_step_size" : 31 , "lr_schedule_alpha" : 0.46093042621480435 },
{ "epochs" : 55 , "batch_size" : 335 , "learning_rate" : 1.8871472361824504e-06 ,
"lr_schedule_step_size" : 4 , "lr_schedule_alpha" : 0.12430535630860752 }]
known_results = [
{ "accuracy" : 0.9776000380516052 , "loss" : 0.07705902308225632 ,
"training_time" : 113.51355695724487 },
{ "accuracy" : 0.11350000649690628 , "loss" : 2.3023123741149902 ,
"training_time" : 170.52794313430786 },
{ "accuracy" : 0.9914000630378723 , "loss" : 0.08842651546001434 ,
"training_time" : 80.80227994918823 },
{ "accuracy" : 0.9047000408172607 , "loss" : 0.3004710376262665 ,
"training_time" : 21.348045825958252 },
{ "accuracy" : 0.9754000306129456 , "loss" : 0.08216287940740585 ,
"training_time" : 544.683718919754 },
{ "accuracy" : 0.9877000451087952 , "loss" : 0.09254179149866104 ,
"training_time" : 106.59501481056213 },
{ "accuracy" : 0.9922000765800476 , "loss" : 0.032735444605350494 ,
"training_time" : 296.68169689178467 },
{ "accuracy" : 0.8461000323295593 , "loss" : 0.6623826026916504 ,
"training_time" : 88.24818873405457 },
{ "accuracy" : 0.9937000274658203 , "loss" : 0.053511716425418854 ,
"training_time" : 209.23796105384827 },
{ "accuracy" : 0.26520001888275146 , "loss" : 2.2552361488342285 ,
"training_time" : 149.95389699935913 } ]
if __name__ == "__main__" :
to_user_domain_f = partial ( to_user_domain , parameters )
tracer = Tracer ( to_user_domain_f , "tick_tock_c02.txt" )
task = MNISTTask ( to_user_domain_f , tracer )
# tracer.log_initial_values(known_configs, known_results)
# X = torch.tensor([to_search_domain(parameters, config)
# for config in known_configs],
# dtype=DTYPE, device=DEVICE)
# Y = torch.tensor([result_to_y(result)
# for result in known_results],
# dtype=DTYPE, device=DEVICE)
# Get initial observations via a random search. Often it makes sense to
# replace this step with a set of manually-specified suggestions (above)
X = sobol_sample ( N_RANDOM , parameters , constraints ). to ( device = DEVICE )
tracer . log_random ( X )
Y = task . tick_tock_f ( X )
# Then optimize
rounding_function = partial ( round_individual_parameters , parameters )
X , Y , model = tick_tock_optimize (
task . tick_tock_f , X , Y ,
botorch_bounds ( parameters ),
botorch_inequality_constraints ( constraints ),
rounding_function ,
botorch . models . SingleTaskGP ,
tracer = tracer )