#!/usr/bin/env python # coding: utf-8 # In[1]: from __future__ import absolute_import, division, print_function, unicode_literals import os, sys from os.path import abspath module_path = os.path.abspath(os.path.join('..')) if module_path not in sys.path: sys.path.append(module_path) import warnings warnings.filterwarnings('ignore') import keras.backend as k from keras.models import Sequential from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Activation, Dropout import numpy as np import matplotlib.pyplot as plt get_ipython().run_line_magic('matplotlib', 'inline') import tensorflow as tf tf.compat.v1.disable_eager_execution() tf.get_logger().setLevel('ERROR') from art.estimators.classification import KerasClassifier from art.attacks.poisoning import PoisoningAttackBackdoor, PoisoningAttackCleanLabelBackdoor from art.attacks.poisoning.perturbations import add_pattern_bd from art.utils import load_mnist, preprocess, to_categorical from art.defences.trainer import AdversarialTrainerMadryPGD from art.estimators.classification.deep_partition_ensemble import DeepPartitionEnsemble # # Load the Data # In[2]: (x_raw, y_raw), (x_raw_test, y_raw_test), min_, max_ = load_mnist(raw=True) # Random Selection: n_train = np.shape(x_raw)[0] num_selection = 10000 random_selection_indices = np.random.choice(n_train, num_selection) x_raw = x_raw[random_selection_indices] y_raw = y_raw[random_selection_indices] # Poison training data percent_poison = .33 x_train, y_train = preprocess(x_raw, y_raw) x_train = np.expand_dims(x_train, axis=3) x_test, y_test = preprocess(x_raw_test, y_raw_test) x_test = np.expand_dims(x_test, axis=3) # Shuffle training data n_train = np.shape(y_train)[0] shuffled_indices = np.arange(n_train) np.random.shuffle(shuffled_indices) x_train = x_train[shuffled_indices] y_train = y_train[shuffled_indices] # # Initialize the Model Architecture # In[3]: # Create Keras convolutional neural network - basic architecture from Keras examples # Source here: https://github.com/keras-team/keras/blob/master/examples/mnist_cnn.py def create_model(): model = Sequential() model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=x_train.shape[1:])) model.add(Conv2D(64, (3, 3), activation='relu')) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) model.add(Flatten()) model.add(Dense(128, activation='relu')) model.add(Dropout(0.5)) model.add(Dense(10, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) return model # # Set up the Model Backdoor # In[4]: backdoor = PoisoningAttackBackdoor(add_pattern_bd) example_target = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1]) pdata, plabels = backdoor.poison(x_test, y=example_target) plt.imshow(pdata[0].squeeze()) # # Create the poison data # For this example, we will select 9 as the target class. Thus, the adversary's goal is to poison the model so adding a trigger will result in the trained model misclassifying the triggered input as a 9. # # First, the adversary will create a proxy classifier (i.e., a classifier that is similar to the target classifier). As the clean label attack generates noise using PGD in order to encourage the trained classifier to rely on the trigger, it is important that the generated noise be transferable. Thus, adversarial training is used. # In[5]: # Poison some percentage of all non-nines to nines targets = to_categorical([9], 10)[0] proxy = AdversarialTrainerMadryPGD(KerasClassifier(create_model()), nb_epochs=10, eps=0.15, eps_step=0.001) proxy.fit(x_train, y_train) # In[6]: attack = PoisoningAttackCleanLabelBackdoor(backdoor=backdoor, proxy_classifier=proxy.get_classifier(), target=targets, pp_poison=percent_poison, norm=2, eps=5, eps_step=0.1, max_iter=200) pdata, plabels = attack.poison(x_train, y_train) # In[7]: poisoned = pdata[np.all(plabels == targets, axis=1)] poisoned_labels = plabels[np.all(plabels == targets, axis=1)] print(len(poisoned)) for i in range(len(poisoned)): if poisoned[i][0][0] != 0: plt.imshow(poisoned[i].squeeze()) plt.show() print(f"Index: {i} Label: {np.argmax(poisoned_labels[i])}") break # # Initialize the classification models # We will initialize four models. The first is a single model architecture. The other three are DPA models with varying ensemble sizes to demonstrate the tradeoff between clean accuracy and poison accuracy. This make take some time because of the model copying. # In[8]: model = KerasClassifier(create_model()) dpa_model_10 = DeepPartitionEnsemble(model, ensemble_size=10) dpa_model_20 = DeepPartitionEnsemble(model, ensemble_size=20) dpa_model_30 = DeepPartitionEnsemble(model, ensemble_size=30) # Train the models on the poisoned data # In[9]: model.fit(pdata, plabels, nb_epochs=10) dpa_model_10.fit(pdata, plabels, nb_epochs=10) dpa_model_20.fit(pdata, plabels, nb_epochs=10) dpa_model_30.fit(pdata, plabels, nb_epochs=10) # # Evaluate the performance of the trained models on unpoisoned data # The performance of the models appears normal. We see that for the DPA models, the performance drops slightly as the ensemble size increases # In[10]: clean_preds = np.argmax(model.predict(x_test), axis=1) clean_correct = np.sum(clean_preds == np.argmax(y_test, axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nClean test set accuracy (model): %.2f%%" % (clean_acc * 100)) # Display image, label, and prediction for a clean sample to show how the poisoned model classifies a clean sample c = 0 # class to display i = 0 # image of the class to display c_idx = np.where(np.argmax(y_test, 1) == c)[0][i] # index of the image in clean arrays plt.imshow(x_test[c_idx].squeeze()) plt.show() clean_label = c print("Prediction: " + str(clean_preds[c_idx])) # In[11]: clean_preds = np.argmax(dpa_model_10.predict(x_test), axis=1) clean_correct = np.sum(clean_preds == np.argmax(y_test, axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nClean test set accuracy (DPA model_10): %.2f%%" % (clean_acc * 100)) # Display image, label, and prediction for a clean sample to show how the poisoned model classifies a clean sample c = 0 # class to display i = 0 # image of the class to display c_idx = np.where(np.argmax(y_test, 1) == c)[0][i] # index of the image in clean arrays plt.imshow(x_test[c_idx].squeeze()) plt.show() clean_label = c print("Prediction: " + str(clean_preds[c_idx])) # In[12]: clean_preds = np.argmax(dpa_model_20.predict(x_test), axis=1) clean_correct = np.sum(clean_preds == np.argmax(y_test, axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nClean test set accuracy (DPA model_20): %.2f%%" % (clean_acc * 100)) # Display image, label, and prediction for a clean sample to show how the poisoned model classifies a clean sample c = 0 # class to display i = 0 # image of the class to display c_idx = np.where(np.argmax(y_test, 1) == c)[0][i] # index of the image in clean arrays plt.imshow(x_test[c_idx].squeeze()) plt.show() clean_label = c print("Prediction: " + str(clean_preds[c_idx])) # In[13]: clean_preds = np.argmax(dpa_model_30.predict(x_test), axis=1) clean_correct = np.sum(clean_preds == np.argmax(y_test, axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nClean test set accuracy (DPA model_30): %.2f%%" % (clean_acc * 100)) # Display image, label, and prediction for a clean sample to show how the poisoned model classifies a clean sample c = 0 # class to display i = 0 # image of the class to display c_idx = np.where(np.argmax(y_test, 1) == c)[0][i] # index of the image in clean arrays plt.imshow(x_test[c_idx].squeeze()) plt.show() clean_label = c print("Prediction: " + str(clean_preds[c_idx])) # # Evaluate the performance of the trained models on poisoned data # When the trigger is added, we see a shift in performance. The single model performs the worst as no defense is in place to mitigate the effect of the poisoned. The DPA models show some robustness to the poison as they partition the training data, which spreads the effect of the poison between models in the ensemble. # In[14]: not_target = np.logical_not(np.all(y_test == targets, axis=1)) px_test, py_test = backdoor.poison(x_test[not_target], y_test[not_target]) poison_preds = np.argmax(model.predict(px_test), axis=1) clean_correct = np.sum(poison_preds == np.argmax(y_test[not_target], axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nPoison test set accuracy (model): %.2f%%" % (clean_acc * 100)) c = 0 # index to display plt.imshow(px_test[c].squeeze()) plt.show() clean_label = c print("Prediction: " + str(poison_preds[c])) # In[15]: poison_preds = np.argmax(dpa_model_10.predict(px_test), axis=1) clean_correct = np.sum(poison_preds == np.argmax(y_test[not_target], axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nPoison test set accuracy (DPA model_10): %.2f%%" % (clean_acc * 100)) c = 0 # index to display plt.imshow(px_test[c].squeeze()) plt.show() clean_label = c print("Prediction: " + str(poison_preds[c])) # In[16]: poison_preds = np.argmax(dpa_model_20.predict(px_test), axis=1) clean_correct = np.sum(poison_preds == np.argmax(y_test[not_target], axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nPoison test set accuracy (DPA model_20): %.2f%%" % (clean_acc * 100)) c = 0 # index to display plt.imshow(px_test[c].squeeze()) plt.show() clean_label = c print("Prediction: " + str(poison_preds[c])) # In[17]: poison_preds = np.argmax(dpa_model_30.predict(px_test), axis=1) clean_correct = np.sum(poison_preds == np.argmax(y_test[not_target], axis=1)) clean_total = y_test.shape[0] clean_acc = clean_correct / clean_total print("\nPoison test set accuracy (DPA model_30): %.2f%%" % (clean_acc * 100)) c = 0 # index to display plt.imshow(px_test[c].squeeze()) plt.show() clean_label = c print("Prediction: " + str(poison_preds[c])) # In[ ]: