!wget https://data.mendeley.com/datasets/rscbjbr9sj/2/files/41d542e7-7f91-47f6-9ff2-dd8e5a5a7861/ChestXRay2017.zip !unzip -q ChestXRay2017.zip try: %tensorflow_version 2.x except Exception: pass import tensorflow as tf tf.__version__ from tensorflow.keras.optimizers import SGD, Adam from tensorflow.keras import layers from tensorflow.python.keras.utils.data_utils import Sequence from tensorflow.keras.utils import to_categorical from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input from tensorflow.keras.models import Model from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, LearningRateScheduler from tensorflow.keras.preprocessing.image import ImageDataGenerator import glob import matplotlib.pyplot as plt import matplotlib.image as mpimg import numpy as np import PIL import cv2 from sklearn.preprocessing import LabelEncoder from sklearn.metrics import confusion_matrix import itertools import os class_1_len = len(os.listdir("chest_xray/train/NORMAL")) class_2_len = len(os.listdir("chest_xray/train/PNEUMONIA")) class_1_len, class_2_len 1 / class_1_len, 1 / class_2_len batch_size = 32 img_size = 256 input_img_size = (256, 256, 1) num_classes = 2 train_generator = ImageDataGenerator( preprocessing_function=preprocess_input, rotation_range=20, zoom_range=0.05, width_shift_range=0.05, height_shift_range=0.05, shear_range=0.05, horizontal_flip=True, fill_mode="nearest").flow_from_directory( "chest_xray/train", target_size=(img_size, img_size), color_mode="grayscale", shuffle=True, batch_size=batch_size) val_generator = ImageDataGenerator( preprocessing_function=preprocess_input).flow_from_directory( "chest_xray/test", target_size=(img_size, img_size), color_mode="grayscale", shuffle=False, batch_size=batch_size) training_folder_size = 5232 validation_folder_size = 624 train_steps = training_folder_size // batch_size val_steps = validation_folder_size // batch_size train_steps, val_steps def create_vgg_model(): input = layers.Input(input_img_size) model = layers.Conv2D(64, 3, padding='same', activation='relu')(input) model = layers.Conv2D(64, 3, padding='same', activation='relu')(model) model = layers.MaxPool2D(2, strides=2, padding='same')(model) model = layers.Conv2D(128, 3, padding='same', activation='relu')(model) model = layers.Conv2D(128, 3, padding='same', activation='relu')(model) model = layers.MaxPool2D(2, strides=2, padding='same')(model) model = layers.Conv2D(256, 3, padding='same', activation='relu')(model) model = layers.Conv2D(256, 3, padding='same', activation='relu')(model) model = layers.Conv2D(256, 3, padding='same', activation='relu')(model) model = layers.MaxPool2D(2, strides=2, padding='same')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.MaxPool2D(2, strides=2, padding='same')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.Conv2D(512, 3, padding='same', activation='relu')(model) model = layers.GlobalAveragePooling2D()(model) model = layers.Dense(num_classes, activation="softmax", kernel_initializer='uniform')(model) model = Model(inputs=input, outputs=model) return model class LearningRateFinder: def __init__(self, model): self.model = model self.steps_per_epoch = train_steps self.epochs = 5 self.start_lr = 1e-10 self.end_lr = 1e+1 self.learning_rates_tested = [] self.losses = [] self.batch_number = 0 self.average_loss = 0 self.best_loss = 1e9 self.beta = 0.98 self.batch_updates = self.epochs * self.steps_per_epoch self.lr_multiplier = (self.end_lr / self.start_lr) ** (1.0 / self.batch_updates) self.stop_factor = 4 def on_batch_end(self, batch, logs): current_lr = tf.keras.backend.get_value(self.model.optimizer.lr) self.learning_rates_tested.append(current_lr) loss_value = logs["loss"] self.batch_number += 1 self.average_loss = (self.beta * self.average_loss) + ((1 - self.beta) * loss_value) smooth_loss = self.average_loss / (1 - (self.beta ** self.batch_number)) self.losses.append(smooth_loss) max_loss_allowed = self.stop_factor * self.best_loss if self.batch_number > 1 and smooth_loss > max_loss_allowed: self.model.stop_training = True return if self.batch_number == 1 or smooth_loss < self.best_loss: self.best_loss = smooth_loss new_lr = current_lr * self.lr_multiplier tf.keras.backend.set_value(self.model.optimizer.lr, new_lr) def plot_model_results(self): selected_learning_rates = self.learning_rates_tested[10:-1] selected_losses = self.losses[10:-1] plt.plot(selected_learning_rates, selected_losses) plt.xscale("log") plt.xlabel("Learning Rate") plt.ylabel("Loss") plt.title("learning rate finder") def find_lr(self): callback = tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.on_batch_end(batch, logs)) optimizer = SGD(lr=self.start_lr, momentum=0.9) self.model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) self.model.fit_generator(train_generator, steps_per_epoch=self.steps_per_epoch, epochs=self.epochs, verbose=1, callbacks=[callback]) self.plot_model_results() test_model = None test_model = create_vgg_model() lr_finder = LearningRateFinder(test_model) lr_finder.find_lr() epochs = 32 min_learning_rate = 1e-3 max_learning_rate = 1e-1 class CycleLearningRate(tf.keras.callbacks.Callback): def __init__(self, model, policy="triangular"): super(CycleLearningRate, self).__init__() self.model = model self.training_iterations = 0 self.cycle_iterations = 0 self.history = {} self.step_size = 4 * train_steps if policy == "triangular": self.policy_fn = lambda x: 1. else: self.policy_fn = lambda x: 1 / (2. ** (x - 1)) def compute_learning_rate(self): cycle = np.floor(1 + self.cycle_iterations / (2 * self.step_size)) x = np.abs(self.cycle_iterations / self.step_size - 2 * cycle + 1) return min_learning_rate + (max_learning_rate - min_learning_rate) * np.maximum(0, (1 - x)) * self.policy_fn(cycle) def on_train_begin(self, logs={}): if self.cycle_iterations == 0: tf.keras.backend.set_value(self.model.optimizer.lr, min_learning_rate) else: tf.keras.backend.set_value(self.model.optimizer.lr, self.compute_learning_rate()) def on_batch_end(self, batch, logs=None): logs = logs or {} self.training_iterations += 1 self.cycle_iterations += 1 self.history.setdefault('lr', []).append(tf.keras.backend.get_value(self.model.optimizer.lr)) self.history.setdefault('iterations', []).append(self.training_iterations) for k, v in logs.items(): self.history.setdefault(k, []).append(v) tf.keras.backend.set_value(self.model.optimizer.lr, self.compute_learning_rate()) weights_name = "epoch={epoch:02d}|accuracy={acc:.4f}|val_accuracy={val_acc:.4f}.h5" checkpoint = ModelCheckpoint(weights_name, monitor="val_acc", verbose=1, save_best_only=True, save_weights_only=True, mode="max", save_freq='epoch') def categorical_focal_loss(alpha=0.25, gamma=2.0): def focal_loss(y_true, y_pred): crossentropy_loss = -y_true * tf.keras.backend.log(y_pred) focal_loss = alpha * tf.keras.backend.pow(1 - y_pred, gamma) * crossentropy_loss return tf.keras.backend.sum(focal_loss, axis=1) return focal_loss model = None normal_model = create_vgg_model() cycle_lr = CycleLearningRate(normal_model, policy="triangular") optimizer = SGD(lr=min_learning_rate, momentum=0.9) normal_model.compile(loss=categorical_focal_loss(), optimizer=optimizer, metrics=['acc']) trained_model = normal_model.fit_generator(train_generator, epochs=epochs, steps_per_epoch=train_steps, callbacks=[checkpoint, cycle_lr], class_weight={0:1.0, 1:0.33}, validation_data=val_generator, validation_steps=val_steps, verbose=1) def plot_validation_training(metric, trained_model): validation_metric = trained_model.history[f'val_{metric}'] training_metric = trained_model.history[metric] epochs = range(len(training_metric)) plt.plot(epochs, training_metric, 'b', label=f'Training {metric}') plt.plot(epochs, validation_metric, 'r', label=f'Validation {metric}') plt.ylim(bottom=0) plt.xlabel('Epochs ', fontsize=16) plt.ylabel(metric, fontsize=16) loc = 'upper right' if metric == "loss" else 'lower right' plt.legend(loc=loc) plt.title(f'Training and validation {metric}', fontsize=20) plt.show() plot_validation_training("loss", trained_model) plot_validation_training("acc", trained_model) iterations = np.arange(0, len(cycle_lr.history["lr"])) plt.figure() plt.plot(iterations, cycle_lr.history["lr"]) plt.title("Cyclical Learning Rate (CLR)") plt.xlabel("Training Iterations") plt.ylabel("Learning Rate") plt.show() normal_model.load_weights("/content/epoch=13_accuracy=0.8870_val_accuracy=0.8651.h5") def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues): if normalize: cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] plt.imshow(cm, interpolation='nearest', cmap=cmap) plt.title(title) plt.colorbar() tick_marks = np.arange(len(classes)) plt.xticks(tick_marks, classes, rotation=45) plt.yticks(tick_marks, classes) fmt = '.2f' if normalize else 'd' thresh = cm.max() / 2. for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])): plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black") plt.ylabel('True label') plt.xlabel('Predicted label') plt.tight_layout() val_generator.reset() val_generator.class_indices def test_model(model, generator): predicted_classes = model.predict_generator(generator, verbose=1) max_class_predicted = np.argmax(predicted_classes, axis=1) classes_names = ['NORMAL', 'PNEUMONIA'] labels = val_generator.classes cm = confusion_matrix(labels, max_class_predicted, labels=range(num_classes)) plot_confusion_matrix(cm, classes_names) sensitivity = cm[0, 0] / (cm[0, 0] + cm[0, 1]) print(f'sensitivity: {sensitivity}') specificity = cm[1, 1] / (cm[1, 1] + cm[1, 0]) print(f'specifity: {specificity}') incorrect_labels = np.where(max_class_predicted != labels)[0] return max_class_predicted, incorrect_labels val_class_predicted, val_incorrect_labels = test_model(normal_model, val_generator) from sklearn.metrics import classification_report labels = val_generator.classes classes_names = ['NORMAL', 'PNEUMONIA'] report = classification_report(val_class_predicted, labels, target_names=classes_names) print(report) normal_path = "chest_xray/test/NORMAL/" pneumonia_path = "chest_xray/test/PNEUMONIA/" validation_images_paths = [normal_path + img_path for img_path in os.listdir(normal_path)] + [pneumonia_path + img_path for img_path in os.listdir(pneumonia_path)] print("Found %d incorrect labels" % len(val_incorrect_labels)) def load_image(img_path): img = PIL.Image.open(img_path) img = img.resize((128, 128)) img = np.asarray(img) return img def check_predictions(from_num, to_num): plt.figure(figsize=(15, 15)) for index, incorrect_label in enumerate(val_incorrect_labels[from_num:to_num]): plt.subplot(3, 3, index + 1) img = load_image(validation_images_paths[incorrect_label]) plt.imshow(img, cmap='gray') plt.title("Predicted {}, Class {}".format(val_class_predicted[incorrect_label], val_generator.classes[incorrect_label])) plt.tight_layout() check_predictions(0, 9) check_predictions(-10, -1) !pip install -q tensorflow-model-optimization from tensorflow_model_optimization.sparsity import keras as sparsity import tempfile import zipfile _, keras_file = tempfile.mkstemp('.h5') print('model saved: ', keras_file) tf.keras.models.save_model(normal_model, keras_file, include_optimizer=False) epochs = 8 start_step = 0 end_step = train_steps * epochs pruning_params = { 'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.0, final_sparsity=0.50, begin_step=start_step, end_step=end_step, frequency=100) } loaded_model = tf.keras.models.load_model(keras_file) pruned_model = sparsity.prune_low_magnitude(loaded_model, **pruning_params) optimizer = SGD(lr=min_learning_rate, momentum=0.9) pruned_model.compile(loss=categorical_focal_loss(), optimizer=optimizer, metrics=['acc']) cycle_lr = CycleLearningRate(pruned_model, policy="triangular") pruned_weights_name = "pruned_weight|epoch={epoch:02d}|accuracy={acc:.4f}|val_accuracy={val_acc:.4f}.h5" checkpoint = ModelCheckpoint(pruned_weights_name, monitor="val_acc", verbose=1, save_weights_only=True, mode="max", save_freq='epoch') callbacks = [ sparsity.UpdatePruningStep(), cycle_lr, checkpoint ] pruned_model.fit_generator(train_generator, epochs=epochs, steps_per_epoch=train_steps, callbacks=callbacks, validation_data=val_generator, validation_steps=val_steps, verbose=1) iterations = np.arange(0, len(cycle_lr.history["lr"])) plt.figure() plt.plot(iterations, cycle_lr.history["lr"]) plt.title("Cyclical Learning Rate") plt.xlabel("Training Iterations") plt.ylabel("Learning Rate") plt.show() pruned_model.load_weights("pruned_weight|epoch=03|accuracy=0.9208|val_accuracy=0.8553.h5") final_model = sparsity.strip_pruning(pruned_model) _, _ = test_model(pruned_model, val_generator) _, pruned_keras_file = tempfile.mkstemp('.h5') print('pruned model saved: ', pruned_keras_file) tf.keras.models.save_model(final_model, pruned_keras_file, include_optimizer=False) _, zip1 = tempfile.mkstemp('.zip') with zipfile.ZipFile(zip1, 'w', compression=zipfile.ZIP_DEFLATED) as f: f.write(keras_file) print("Size of the unpruned model before compression: %.2f Mb" % (os.path.getsize(keras_file) / float(2**20))) print("Size of the unpruned model after compression: %.2f Mb" % (os.path.getsize(zip1) / float(2**20))) _, zip2 = tempfile.mkstemp('.zip') with zipfile.ZipFile(zip2, 'w', compression=zipfile.ZIP_DEFLATED) as f: f.write(pruned_keras_file) print("Size of the pruned model before compression: %.2f Mb" % (os.path.getsize(pruned_keras_file) / float(2**20))) print("Size of the pruned model after compression: %.2f Mb" % (os.path.getsize(zip2) / float(2**20)))