# =====================
# The optimization code
# =====================
import math
import os
import botorch
import botorch.acquisition
import botorch.models
import botorch.optim
import botorch.utils
import gpytorch.mlls
import torch
DEVICE = torch . device ( "cuda" if torch . cuda . is_available () else "cpu" )
DTYPE = torch . double
SMOKE_TEST = os . environ . get ( "SMOKE_TEST" )
# Botorch heuristics configuration
NUM_RESTARTS = 60 if not SMOKE_TEST else 2
RAW_SAMPLES = 1024 if not SMOKE_TEST else 32
N_RANDOM = 10 if not SMOKE_TEST else 1
# When using joint optimization and batching, botorch seems to perform poorly on
# this task. Often one item in the batch is good and the rest are throwaway
# experiments with ~zero probability of even meeting the constraints. So we
# disable batching here. We could also enable batching and change optimize_acqf
# to use sequential=True if we want to run experiments in parallel.
# BATCH_SIZE = 3 if not SMOKE_TEST else 2
# N_BATCH = 20 if not SMOKE_TEST else 2
BATCH_SIZE = 1
N_BATCH = 60 if not SMOKE_TEST else 2
MC_SAMPLES = 256 if not SMOKE_TEST else 32
MAX_TRAINING_TIME = 60
def loss_objective_constructor ( model , X , Y ):
return botorch . acquisition . ConstrainedMCObjective (
objective = lambda Z : - Z [..., 0 ],
constraints = [ lambda Z : Z [..., 1 ] - math . log ( MAX_TRAINING_TIME )],
eta = ( math . log ( MAX_TRAINING_TIME ) / 1000 ),
infeasible_cost = Y [..., 0 ]. max () + 2.0
)
def cost_objective_constructor ( model , X , Y ):
# Compute MC_SAMPLES different loss thresholds, one for each sample.
# This cost_objective expects to be called on a MC_SAMPLES of
# each batch item.
with torch . no_grad ():
baseline_posterior = model . posterior ( X )
sampler = botorch . sampling . IIDNormalSampler ( num_samples = MC_SAMPLES )
baseline_samples = sampler ( baseline_posterior )
# Use the samples to decide on a best baseline configuration.
loss_objective = loss_objective_constructor ( model , X , Y )
best_baseline = loss_objective ( baseline_samples ). max (
dim =- 1 , keepdim = True ). indices
sampled_best_loss_within_budget = baseline_samples [..., 0 ]. gather (
- 1 , best_baseline
)
return botorch . acquisition . ConstrainedMCObjective (
objective = lambda Z : - Z [..., 1 ],
constraints = [
lambda Z : ( Z [..., 0 ]
- sampled_best_loss_within_budget . view (
[ - 1 ] + [ 1 ] * ( len ( Z . shape ) - 2 ))
),
# It is important to also include this constraint so that the
# baseline "best training time" inside the qNoisyExpectedImprovement
# doesn't get dominated by configurations with large training times
# (either because they have low loss or just high variance in loss).
# Otherwise it would be possible for candidates with long training
# times to have a large expected improvement.
lambda Z : Z [..., 1 ] - math . log ( MAX_TRAINING_TIME )
],
eta = ( 1 / 1000 ),
infeasible_cost = Y [..., 1 ]. max () + 2.0 ,
)
def _optimize ( f , X , Y , objective_constructor , bounds , inequality_constraints ,
state_dict , tracer ):
model = botorch . models . SingleTaskGP ( X , Y ). to ( X )
if state_dict is not None :
# use the current state dict to speed up fitting
model . load_state_dict ( state_dict )
botorch . fit_gpytorch_model (
gpytorch . mlls . ExactMarginalLogLikelihood ( model . likelihood , model )
)
if tracer is not None :
tracer . log_model ( model )
candidates , acq_values = botorch . optim . optimize_acqf (
acq_function = botorch . acquisition . qNoisyExpectedImprovement (
model = model ,
X_baseline = X ,
sampler = botorch . sampling . SobolQMCNormalSampler (
num_samples = MC_SAMPLES ),
objective = objective_constructor ( model , X , Y ),
),
bounds = bounds ,
inequality_constraints = inequality_constraints ,
q = BATCH_SIZE ,
num_restarts = NUM_RESTARTS ,
raw_samples = RAW_SAMPLES ,
)
X_new = candidates . detach ()
if tracer is not None :
with torch . no_grad ():
posterior = model . posterior ( X_new )
tracer . log_predictions ( X_new , posterior . mean , posterior . variance )
Y_new = f ( X_new )
if tracer is not None :
tracer . log_results ( Y_new )
X = torch . cat ([ X , X_new ])
Y = torch . cat ([ Y , Y_new ])
return X , Y , model
def tick_tock_optimize ( f , X , Y , bounds , inequality_constraints , tracer = None ):
"""
f must take in a batch of points on a hypercube (defined by bounds) and
return two values: an objective, which is maximized, and a training time,
which is minimized
"""
state_dict = None
model = None
for iteration in range ( N_BATCH ):
# Phase 1: Optimize training time, creating more potential for better
# loss
if tracer is not None :
tracer . log ( command = "cost" )
X , Y , model = _optimize ( f , X , Y ,
cost_objective_constructor ,
bounds , inequality_constraints , state_dict , tracer )
state_dict = model . state_dict ()
# Phase 2: Optimize loss, now that some training time has been freed
# up.
if tracer is not None :
tracer . log ( command = "loss" )
X , Y , model = _optimize ( f , X , Y ,
loss_objective_constructor ,
bounds , inequality_constraints , state_dict , tracer )
state_dict = model . state_dict ()
print ( f "Batch { ( iteration + 1 ) : > 2 } " )
return X , Y , model
# =============
# Tracing logic
# =============
import json
class Tracer :
def __init__ ( self , point_to_config_f , filename , clear = True ):
self . point_to_config_f = point_to_config_f
self . filename = filename
if clear :
open ( filename , "w" ). close ()
def log ( self , command = None , data = None ):
with open ( self . filename , "a" ) as fout :
if command is not None :
print ( command , file = fout )
if data is not None :
print ( json . dumps ( data ), file = fout )
def log_model ( self , model ):
self . log (
data = { "likelihood.noise_covar.noise" :
model . likelihood . noise_covar . noise . tolist (),
"mean_module.constant" :
model . mean_module . constant . tolist (),
"covar_module.outputscale" :
model . covar_module . outputscale . tolist (),
"covar_module.base_kernel.lengthscale" :
model . covar_module . base_kernel . lengthscale . tolist ()}
)
def log_random ( self , X ):
configs = [ self . point_to_config_f ( x . tolist ())
for x in X ]
self . log ( command = "random" , data = configs )
def log_initial_values ( self , configs , results ):
self . log ( command = "initial" , data = [ configs , results ])
def log_predictions ( self , X , mean , variance ):
configs = [ self . point_to_config_f ( x . tolist ())
for x in X ]
self . log ( data = [ configs , mean . tolist (), variance . tolist ()])
def log_results ( self , Y ):
# Don't do anything, instead rely on log_result
pass
def log_result ( self , index , result ):
self . log ( data = [ index , result ])
# =====================================================
# SearchSpace utilities
# (Ideally Ax would expose this functionality directly)
# =====================================================
import ax
import ax.core
import ax.modelbridge.registry
from ax.models.torch.utils import _to_inequality_constraints
from ax.modelbridge.modelbridge_utils import extract_parameter_constraints
def ax_bounds ( search_space ):
return [( p . lower , p . upper )
for p in search_space . parameters . values ()]
def botorch_bounds ( search_space ):
return torch . tensor ( ax_bounds ( search_space ),
device = DEVICE , dtype = DTYPE ). T
def ax_linear_constraints ( search_space ):
return extract_parameter_constraints ( search_space . parameter_constraints ,
list ( search_space . parameters . keys ()))
def botorch_inequality_constraints ( search_space ):
return _to_inequality_constraints (
tuple ( torch . tensor ( arr , device = DEVICE , dtype = DTYPE )
for arr in ax_linear_constraints ( search_space ))
)
class SearchSpaceTransformation :
def __init__ ( self , search_space , transforms = ax . modelbridge . registry . Cont_X_trans ):
"""
transforms: Defaults to
[RemoveFixed, OrderedChoiceEncode, OneHot, IntToFloat, Log, Logit, UnitX]
"""
self . before = search_space
transform_instances = []
after = search_space . clone ()
for transform_class in transforms :
t = transform_class ( search_space = after ,
observation_features = [],
observation_data = [])
after = t . transform_search_space ( after )
transform_instances . append ( t )
self . transform_instances = transform_instances
self . after = after
def point_to_config ( self , x ):
observation_features = ax . core . ObservationFeatures (
dict ( zip ( self . after . parameters . keys (), x )))
for t in reversed ( self . transform_instances ):
[ observation_features ] = t . untransform_observation_features (
[ observation_features ]
)
return observation_features . parameters
def config_to_point ( self , config ):
observation_features = ax . core . ObservationFeatures ( config )
for t in self . transform_instances :
[ observation_features ] = t . transform_observation_features (
[ observation_features ]
)
return list ( observation_features . parameters . values ())
# ===================
# Task-specific logic
# ===================
import time
import tensorflow as tf
from tensorflow.keras import layers , models
from tensorflow.keras.datasets import mnist
def result_to_y ( result ):
# Use the log so that all real numbers map to valid losses and times, so the
# GP will always output meaningful predictions (rather than predicting
# negative losses or training times)
#
# This also makes it so that we don't typically need to configure the GP's
# covariance priors.
return [ math . log ( result [ "loss" ]),
math . log ( result [ "training_time" ])]
class MNISTTask :
def __init__ ( self , point_to_config_f , tracer = None ):
( train_images , train_labels ), ( test_images , test_labels ) = mnist . load_data ()
train_images = train_images . reshape (( 60000 , 28 , 28 , 1 ))
train_images = train_images . astype ( 'float32' ) / 255
test_images = test_images . reshape (( 10000 , 28 , 28 , 1 ))
test_images = test_images . astype ( 'float32' ) / 255
self . train_images = train_images
self . train_labels = train_labels
self . test_images = test_images
self . test_labels = test_labels
self . point_to_config = point_to_config_f
self . tracer = tracer
def evaluate ( self , config ):
model = models . Sequential ()
model . add ( layers . Conv2D ( 32 , ( 3 , 3 ), activation = 'relu' , input_shape = ( 28 , 28 , 1 )))
model . add ( layers . MaxPooling2D (( 2 , 2 )))
model . add ( layers . Conv2D ( 64 , ( 3 , 3 ), activation = 'relu' ))
model . add ( layers . MaxPooling2D (( 2 , 2 )))
model . add ( layers . Conv2D ( 64 , ( 3 , 3 ), activation = 'relu' ))
model . add ( layers . Flatten ())
model . add ( layers . Dense ( 64 , activation = 'relu' ))
model . add ( layers . Dense ( 10 , activation = "softmax" ))
# print(model.summary())
model . compile (
optimizer = tf . keras . optimizers . Adam (
learning_rate = config [ "learning_rate" ]
),
loss = "sparse_categorical_crossentropy" ,
metrics = [ 'accuracy' ])
step_size = config [ "lr_schedule_step_size" ]
alpha = config [ "lr_schedule_alpha" ]
def scheduler ( epoch , lr ):
prev_stage = ( epoch - 1 ) // step_size
stage = epoch // step_size
if stage > prev_stage and stage > 0 :
return lr * alpha
return lr
tstart = time . time ()
model . fit ( self . train_images , self . train_labels ,
epochs = config [ "epochs" ],
batch_size = config [ "batch_size" ],
callbacks = [ tf . keras . callbacks . LearningRateScheduler ( scheduler )],
# verbose=0
)
training_time = time . time () - tstart
test_loss , test_acc = model . evaluate ( self . test_images , self . test_labels )
return { "accuracy" : test_acc ,
"loss" : test_loss ,
"training_time" : training_time }
def tick_tock_f ( self , X ):
Y = []
for i , x in enumerate ( X ):
config = self . point_to_config ( X [ i ]. tolist ())
print ( config )
result = self . evaluate ( config )
print ( result )
if self . tracer is not None :
self . tracer . log_result ( i , result )
Y . append ( result_to_y ( result ))
return torch . tensor ( Y )
# =======================
# Connect it all together
# =======================
import ax.models.random.sobol
from ax.service.utils.instantiation import InstantiationBase
max_num_epochs = 60 if not SMOKE_TEST else 3
min_batch_size = 16 if not SMOKE_TEST else 256
parameters = [
{ "name" : "epochs" , "type" : "range" , "bounds" : [ 2 , max_num_epochs ],
"value_type" : "int" },
{ "name" : "batch_size" , "type" : "range" , "bounds" : [ min_batch_size , 4096 ],
"value_type" : "int" , "log_scale" : True },
{ "name" : "learning_rate" , "type" : "range" , "bounds" : [ 1e-6 , 0.4 ],
"value_type" : "float" , "log_scale" : True },
{ "name" : "lr_schedule_step_size" , "type" : "range" , "bounds" : [ 1 , max_num_epochs ],
"value_type" : "int" },
{ "name" : "lr_schedule_alpha" , "type" : "range" , "bounds" : [ 0.05 , 0.98 ],
"value_type" : "float" , "log_scale" : True },
]
parameter_constraints = [
"lr_schedule_step_size <= epochs"
]
known_configs = [{ "epochs" : 58 , "batch_size" : 1650 , "learning_rate" : 4.0241610227744396e-05 , "lr_schedule_step_size" : 56 , "lr_schedule_alpha" : 0.08748550224528025 }, { "epochs" : 42 , "batch_size" : 116 , "learning_rate" : 0.1770803853571978 , "lr_schedule_step_size" : 23 , "lr_schedule_alpha" : 0.11006912054461343 }, { "epochs" : 34 , "batch_size" : 392 , "learning_rate" : 0.012227021604924818 , "lr_schedule_step_size" : 29 , "lr_schedule_alpha" : 0.1671634509906181 }, { "epochs" : 11 , "batch_size" : 2930 , "learning_rate" : 0.07922604697796556 , "lr_schedule_step_size" : 2 , "lr_schedule_alpha" : 0.3667039198827423 }, { "epochs" : 49 , "batch_size" : 30 , "learning_rate" : 4.1578447907298945e-06 , "lr_schedule_step_size" : 48 , "lr_schedule_alpha" : 0.06314665815905156 }, { "epochs" : 48 , "batch_size" : 520 , "learning_rate" : 0.019616439244964836 , "lr_schedule_step_size" : 39 , "lr_schedule_alpha" : 0.30361084410606004 }, { "epochs" : 35 , "batch_size" : 40 , "learning_rate" : 0.00014420850956597492 , "lr_schedule_step_size" : 5 , "lr_schedule_alpha" : 0.7494470022424606 }, { "epochs" : 43 , "batch_size" : 2189 , "learning_rate" : 1.6577737963037582e-05 , "lr_schedule_step_size" : 12 , "lr_schedule_alpha" : 0.5407511626603911 }, { "epochs" : 57 , "batch_size" : 153 , "learning_rate" : 0.0015068446519504613 , "lr_schedule_step_size" : 31 , "lr_schedule_alpha" : 0.46093042621480435 }, { "epochs" : 55 , "batch_size" : 335 , "learning_rate" : 1.8871472361824504e-06 , "lr_schedule_step_size" : 4 , "lr_schedule_alpha" : 0.12430535630860752 }]
known_results = [{ "accuracy" : 0.9776000380516052 , "loss" : 0.07705902308225632 , "training_time" : 113.51355695724487 },
{ "accuracy" : 0.11350000649690628 , "loss" : 2.3023123741149902 , "training_time" : 170.52794313430786 },
{ "accuracy" : 0.9914000630378723 , "loss" : 0.08842651546001434 , "training_time" : 80.80227994918823 },
{ "accuracy" : 0.9047000408172607 , "loss" : 0.3004710376262665 , "training_time" : 21.348045825958252 },
{ "accuracy" : 0.9754000306129456 , "loss" : 0.08216287940740585 , "training_time" : 544.683718919754 },
{ "accuracy" : 0.9877000451087952 , "loss" : 0.09254179149866104 , "training_time" : 106.59501481056213 },
{ "accuracy" : 0.9922000765800476 , "loss" : 0.032735444605350494 , "training_time" : 296.68169689178467 },
{ "accuracy" : 0.8461000323295593 , "loss" : 0.6623826026916504 , "training_time" : 88.24818873405457 },
{ "accuracy" : 0.9937000274658203 , "loss" : 0.053511716425418854 , "training_time" : 209.23796105384827 },
{ "accuracy" : 0.26520001888275146 , "loss" : 2.2552361488342285 , "training_time" : 149.95389699935913 }]
if __name__ == "__main__" :
search_space = InstantiationBase . make_search_space (
parameters = parameters ,
parameter_constraints = parameter_constraints
)
sst = SearchSpaceTransformation ( search_space )
tracer = Tracer ( sst . point_to_config , "tick_tock_out.txt" )
task = MNISTTask ( sst . point_to_config , tracer )
# Get initial observations via a random search. Often it makes sense to
# replace this step with a set of manually-specified suggestions.
g = ax . models . random . sobol . SobolGenerator ()
points , _ = g . gen ( N_RANDOM , ax_bounds ( sst . after ),
ax_linear_constraints ( sst . after ))
X = torch . tensor ( points , dtype = DTYPE , device = DEVICE )
tracer . log_random ( X )
Y = task . tick_tock_f ( X )
# Then optimize
X , Y , model = tick_tock_optimize ( task . tick_tock_f , X , Y ,
botorch_bounds ( sst . after ),
botorch_inequality_constraints ( sst . after ),
tracer = tracer )