Sascha Spors, Professorship Signal Theory and Digital Signal Processing, Institute of Communications Engineering (INT), Faculty of Computer Science and Electrical Engineering (IEF), University of Rostock, Germany
Winter Semester 2023/24 (Master Course #24512)
Feel free to contact lecturer frank.schultz@uni-rostock.de
import keras_tuner as kt
import matplotlib.pyplot as plt
import numpy as np
import os
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelBinarizer
import tensorflow as tf
from tensorflow import keras
import time
print(
"TF version",
tf.__version__,
"\nKeras Tuner version",
kt.__version__,
)
verbose = 1 # plot training status
CI_flag = True # use toy parameters to check if this notebooks runs in CI
ex_str = "ex12_"
time_str = "%Y_%m_%d_%H_%M_"
def get_kt_logdir():
run_id = time.strftime(time_str + ex_str + "kt")
return os.path.join(root_logdir, run_id)
def get_tf_kt_logdir():
run_id = time.strftime(time_str + ex_str + "tf_kt")
return os.path.join(root_logdir, run_id)
def get_tf_logdir():
run_id = time.strftime(time_str + ex_str + "tf")
return os.path.join(root_logdir, run_id)
root_logdir = os.path.join(os.curdir, "tf_keras_logs")
kt_logdir = get_kt_logdir()
tf_kt_logdir = get_tf_kt_logdir()
tf_logdir = get_tf_logdir()
print(root_logdir)
print(kt_logdir) # folder for keras tuner results
print(tf_kt_logdir) # folder for TF checkpoints while keras tuning
print(tf_logdir) # folder for TF checkpoint for best model training
os.makedirs(tf_logdir, exist_ok=True)
nlabels = 3 # number of classes
labels = np.arange(nlabels) # we encode as integers
nx = 2 * nlabels # number of features, here we use 6
m = 100000 # data examples
train_size = 7 / 10 # 7/10 of the whole data set
validate_size = 3 / 10 * 2 / 3 # 1/5 of the whole data set
test_size = 1 - train_size - validate_size # remaining data, must be > 0
X, Y = make_classification(
n_samples=m,
n_features=nx,
n_informative=nx,
n_redundant=0,
n_classes=nlabels,
n_clusters_per_class=1,
class_sep=1,
flip_y=1e-2,
random_state=None,
)
encoder = OneHotEncoder(sparse_output=False)
# we encode as one-hot for TF model
Y = encoder.fit_transform(Y.reshape(-1, 1))
# split into train, val, test data:
X_train, X_tmp, Y_train, Y_tmp = train_test_split(
X, Y, train_size=train_size, random_state=None
)
val_size = (validate_size * m) / ((1 - train_size) * m)
X_val, X_test, Y_val, Y_test = train_test_split(
X_tmp, Y_tmp, train_size=val_size, random_state=None
)
m_train, m_val, m_test = X_train.shape[0], X_val.shape[0], X_test.shape[0]
print(train_size, validate_size, test_size)
print(m_train, m_val, m_test, m_train + m_val + m_test == m)
print(X_train.shape, X_val.shape, X_test.shape)
print(Y_train.shape, Y_val.shape, Y_test.shape)
earlystopping_cb = keras.callbacks.EarlyStopping(
monitor="val_loss", patience=2, restore_best_weights=True # on val data!
)
# as homework we might also consider dropout and regularization in the model
def build_model(hp): # with hyper parameter ranges
model = keras.Sequential()
# input layer
model.add(keras.Input(shape=(nx, )))
# hidden layers
for layer in range(hp.Int("no_layers", 1, 4)):
model.add(
keras.layers.Dense(
units=hp.Int(
f"no_perceptrons_{layer}", min_value=2, max_value=16, step=2
),
activation=hp.Choice("activation", ["tanh", "relu", "sigmoid", "softmax"]),
# sigmoid and softmax could be choice that we want to check as well
# they are not restricted to be used in a classifiction problem
# output layer
)
)
# softmax output layer
model.add(keras.layers.Dense(nlabels, activation="softmax"))
# learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-1,
# sampling='log')
model.compile(
optimizer=keras.optimizers.Adam(), # learning_rate=learning_rate
loss=keras.losses.CategoricalCrossentropy(
from_logits=False, label_smoothing=0
),
metrics=["CategoricalCrossentropy", "CategoricalAccuracy"],
)
return model
if CI_flag:
max_trials = 5 # number of models to build and try
else:
max_trials = 20 # number of models to build and try
executions_per_trial = 2
model = build_model(kt.HyperParameters())
hptuner = kt.RandomSearch(
hypermodel=build_model,
objective='val_loss', # check performance on val data!
max_trials=max_trials,
executions_per_trial=executions_per_trial,
overwrite=True,
directory=kt_logdir,
project_name=None,
)
print(hptuner.search_space_summary())
if CI_flag:
epochs = 3
else:
epochs = 50
tensorboard_cb = keras.callbacks.TensorBoard(tf_kt_logdir)
hptuner.search(
X_train,
Y_train,
validation_data=(X_val, Y_val),
epochs=epochs,
callbacks=[earlystopping_cb, tensorboard_cb],
verbose=verbose,
)
print(hptuner.results_summary())
# we might check the best XX models in detail
# for didactical purpose we choose only the very best one, located in [0]:
model = hptuner.get_best_models(num_models=1)[0]
model.save(tf_logdir + "/best_model.keras")
# taken from https://github.com/keras-team/keras/issues/341
# 183amir commented on 7 Oct 2019:
# "If you are using tensorflow 2, you can use this:"
def reset_weights(model):
for layer in model.layers:
if isinstance(layer, tf.keras.Model):
reset_weights(layer)
continue
for k, initializer in layer.__dict__.items():
if "initializer" not in k:
continue
# find the corresponding variable
var = getattr(layer, k.replace("_initializer", ""))
var.assign(initializer(var.shape, var.dtype))
# 183amir: "I am not sure if it works in all cases, I have only tested the Dense and Conv2D layers."
# load best model and reset weights
model = keras.models.load_model(tf_logdir + "/best_model.keras")
reset_weights(model) # start training from scratch
print(model.summary())
batch_size = 32
if CI_flag:
epochs = 3
else:
epochs = 50
tensorboard_cb = keras.callbacks.TensorBoard(tf_logdir)
history = model.fit(
X_train,
Y_train,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_val, Y_val),
callbacks=[earlystopping_cb, tensorboard_cb],
verbose=verbose,
)
model.save(tf_logdir + "/trained_best_model.keras")
print(model.summary())
def print_results(X, Y):
# https://stackoverflow.com/questions/48908641/how-to-get-a-single-value-from-softmax-instead-of-probability-get-confusion-ma:
lb = LabelBinarizer()
lb.fit(labels)
m = X.shape[0]
results = model.evaluate(X, Y, batch_size=m, verbose=verbose)
Y_pred = model.predict(X)
cm = tf.math.confusion_matrix(
labels=lb.inverse_transform(Y),
predictions=lb.inverse_transform(Y_pred),
num_classes=nlabels,
)
print("data entries", m)
print(
"Cost",
results[0],
"\nCategoricalCrossentropy",
results[1],
"\nCategoricalAccuracy",
results[2],
)
print(
"nCategoricalAccuracy from Confusion Matrix = ",
np.sum(np.diag(cm.numpy())) / m,
)
print("Confusion Matrix in %\n", cm / m * 100)
print("\n\nmetrics on train data:")
print_results(X_train, Y_train)
print("\n\nmetrics on val data:")
print_results(X_val, Y_val)
print("\n\nmetrics on never seen test data:")
print_results(X_test, Y_test)
# recall: the model should generalize well on never before seen data
# so after hyper parameter tuning finding the best model, re-train this best
# model to optimized state we can check with test data (X_test, Y_test), which
# we never used in above training steps!