# Copyright (c) 2020 ZZH
import tensorflow as tf
from matplotlib import pyplot as plt
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, AveragePooling2D, MaxPool2D
from tensorflow.keras.layers import GlobalAveragePooling2D, Dropout, Flatten, Dense, DepthwiseConv2D
from tensorflow.keras.models import Sequential
from tensorflow.keras import Model
import os
import numpy as np
import math
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
print(gpus)
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
epochs = 100
lr = 0.1
batch_size = 128
REGULARIZER = 0.0001
checkpoint_save_path = './Model/ShuffleNetV2/'
log_dir = os.path.join("Model","ShuffleNetV2_logs")
#数据导入及数据增强
cifar10 = tf.keras.datasets.cifar10
(x_train,y_train),(x_test,y_test) = cifar10.load_data()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=10)
mean = [125.307, 122.95, 113.865] #np.mean()
std = [62.9932, 62.0887, 66.7048] #np.std()
for i in range(3):
x_train[:,:,:,i] = (x_train[:,:,:,i] - mean[i]) / std[i]
x_test[:,:,:,i] = (x_test[:,:,:,i] - mean[i]) / std[i]
DataGenTrain = tf.keras.preprocessing.image.ImageDataGenerator(
rotation_range = 15,
width_shift_range = 0.1,
height_shift_range = 0.1,
horizontal_flip = True,
vertical_flip = False,
shear_range=0.1,
zoom_range = 0.1)
DataGenTrain.fit(x_train)
def scheduler(epoch): #HTD(-6,3) with WarmingUp
start = -6.0
end = 3.0
if epoch < 5:
return 0.02 * epoch + 0.02
return lr / 2.0 * (1 - math.tanh((end - start) * epoch / epochs + start))
def channel_shuffle(inputs,spilt,H,W):
C = inputs.shape.as_list()[-1]
x = tf.reshape(inputs, [-1, H, W, spilt, C // spilt])
x = tf.transpose(x, [0, 1, 2, 4, 3])
outputs = tf.reshape(x, [-1, H, W, C])
return outputs
class BNRelu(Model):
def __init__(self):
super(BNRelu,self).__init__()
self.bn = BatchNormalization(momentum=0.9)
self.relu = Activation('relu')
def call(self,inputs):
x = self.bn(inputs)
outputs = self.relu(x)
return outputs
class SEBlock(Model):
def __init__(self,channels):
super(SEBlock,self).__init__()
self.channels = channels
self.p1 = GlobalAveragePooling2D()
self.d1 = Dense(channels//16,activation='relu',kernel_initializer="he_normal",use_bias=False,
kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.d2 = Dense(channels,activation='sigmoid',kernel_initializer="he_normal",use_bias=False,
kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.m1 = tf.keras.layers.Multiply()
def call(self,inputs):
x = self.p1(inputs)
x = self.d1(x)
y = self.d2(x)
y = tf.reshape(y, [-1,1,1,self.channels])
outputs = self.m1([inputs,y])
return outputs
class ShuffleBlock(Model):
def __init__(self,channels,strides,H,W):
super(ShuffleBlock,self).__init__()
self.channels = channels
self.strides = strides
self.H = H
self.W = W
self.c1 = Conv2D(filters=channels//2, kernel_size=1, strides=1, padding='same',use_bias=False,
kernel_initializer="he_normal",kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b1 = BNRelu()
self.c2 = DepthwiseConv2D(kernel_size=3, strides=strides, padding='same', use_bias=False,
depthwise_initializer="he_normal",depthwise_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b2 = BatchNormalization(momentum=0.9)
self.c3 = Conv2D(filters=channels//2, kernel_size=1, strides=1, padding='same', use_bias=False,
kernel_initializer="he_normal",kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b3 = BNRelu()
# self.se = SEBlock(channels=channels)
if self.strides != 1:
self.c4 = DepthwiseConv2D(kernel_size=3, strides=strides, padding='same', use_bias=False,
depthwise_initializer="he_normal",depthwise_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b4 = BNRelu()
self.c5 = Conv2D(filters=channels//2, kernel_size=1, strides=1, padding='same',use_bias=False,
kernel_initializer="he_normal",kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b5 = BNRelu()
def call(self,inputs):
if self.strides == 1:
x1,x2 = tf.split(inputs,2,axis=3)
else:
x1 = inputs
x2 = inputs
x = self.c1(x1)
x = self.b1(x)
x = self.c2(x)
x = self.b2(x)
x = self.c3(x)
x = self.b3(x)
# x = self.se(x)
if self.strides != 1:
x2 = self.c4(x2)
x2 = self.b4(x2)
x2 = self.c5(x2)
x2 = self.b5(x2)
y = tf.concat([x,x2],axis=-1)
outputs = channel_shuffle(y,spilt=2,H=self.H,W=self.W)
return outputs
class ShuffleNet(Model):
def __init__(self,block_list):
super(ShuffleNet,self).__init__()
self.channels = 116
self.H = 32
self.W = 32
self.c1 = Conv2D(filters=24, kernel_size=3, strides=1, padding='same', use_bias=False, input_shape=(128, 32, 32, 3),
kernel_initializer="he_normal",kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.b1 = BNRelu()
self.blocks = Sequential()
for i in range(len(block_list)):
self.blocks.add(ShuffleBlock(channels=self.channels,strides=2,H=self.H,W=self.W))
self.H = self.H // 2
self.W = self.W // 2
for _ in range(1,block_list[i]):
self.blocks.add(ShuffleBlock(channels=self.channels,strides=1,H=self.H,W=self.W))
self.channels *= 2
self.c2 = Conv2D(filters=1024, kernel_size=1, strides=1, padding='same', use_bias=False,
kernel_initializer="he_normal",kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
self.p1 = GlobalAveragePooling2D()
self.f1 = Dense(10,activation='softmax',kernel_initializer="he_normal",
kernel_regularizer=tf.keras.regularizers.l2(REGULARIZER))
def call(self,inputs):
x = self.c1(inputs)
x = self.b1(x)
x = self.blocks(x)
x = self.c2(x)
x = self.p1(x)
y = self.f1(x)
return y
model = ShuffleNet([4,8,4])
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=lr, momentum=0.9, nesterov=True, clipnorm=2.),
loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
metrics=['accuracy'])
callbacks = [
tf.keras.callbacks.LearningRateScheduler(scheduler), #学习率衰减表
#tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=0.1, min_lr=0.0001, patience=10, cooldown=0)
tf.keras.callbacks.ModelCheckpoint( #模型保存
filepath = checkpoint_save_path,
save_weights_only = False,
monitor = 'val_accuracy',
save_best_only = True),
# tf.keras.callbacks.EarlyStopping( #早停
# monitor = 'val_accuracy',
# patience=15,
# baseline=None),
tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=False) #保存计算图
]
hist = model.fit(DataGenTrain.flow(x_train,y_train,batch_size=batch_size,shuffle=True),
epochs=epochs,
validation_data=(x_test,y_test),
validation_freq=1,
callbacks=callbacks)
model.summary()
#结果可视化
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
plt.style.use({'figure.figsize':(6,4)})
plt.plot(hist.history['loss'], label='loss')
plt.plot(hist.history['val_loss'], label='val_loss')
plt.legend()
plt.show()
plt.plot(hist.history['val_accuracy'], label='val_accuracy')
plt.legend()
plt.show()
#tensorboard可视化
#!tensorboard --logdir=./Model/ShuffleNetV2_logs
#http://localhost:6006/
print('best result: {:.2f}% ({}epochs)'.format(100*max(hist.history['val_accuracy']),1+hist.history['val_accuracy'].index(max(hist.history['val_accuracy']))))
# best result : 92.41% (97epochs)