from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.ensemble import IsolationForest
class UrlDatasetLoader():
def __init__(self):
print('init Loader notebook')
def load_data(self, url="https://raw.githubusercontent.com/quickheaven/scs-3253-machine-learning/master/datasets/ISCX-URL2016_All.csv"):
"""
(string) --> dataframe
This function returns the dataframe of maliciours url.
Parameters
----------
url: By default, it fetch the data from github otherwise a local path or url can be provided so the data can be loaded faster.
"""
df = pd.read_csv(url)
return df
def prepare_data(self, data, fill_na=True, feature_selection=True, show_graph=False):
"""
(DataFrame, boolean, boolean) --> X and y of the dataframe.
This function returns the X and y of the malicious url dataframe.
Parameters
----------
fill_na : True to fill the na records with mean values otherwise drop the features.
feature_selection : True to remove one or more features that have a correlation higher than 0.9 othewise do not perform that type of feature selection.
https://towardsdatascience.com/feature-selection-correlation-and-p-value-da8921bfb3cf
show_graph : True to display the graph after applying fill_na or feature_selection.
"""
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import IsolationForest
data = data.copy()
y_feature = 'URL_Type_obf_Type'
if (fill_na == True):
data['avgpathtokenlen'] = data['avgpathtokenlen'].fillna(data['avgpathtokenlen'].mean())
data['NumberRate_DirectoryName'] = data['NumberRate_DirectoryName'].fillna(data['NumberRate_DirectoryName'].mean())
data['NumberRate_FileName'] = data['NumberRate_FileName'].fillna(data['NumberRate_FileName'].mean())
data['NumberRate_Extension'] = data['NumberRate_Extension'].fillna(data['NumberRate_Extension'].mean())
data['NumberRate_AfterPath'] = data['NumberRate_AfterPath'].fillna(data['NumberRate_AfterPath'].mean())
data['Entropy_DirectoryName'] = data['Entropy_DirectoryName'].fillna(data['Entropy_DirectoryName'].mean())
data['Entropy_Filename'] = data['Entropy_Filename'].fillna(data['Entropy_Filename'].mean())
data['Entropy_Extension'] = data['Entropy_Extension'].fillna(data['Entropy_Extension'].mean())
data['Entropy_Afterpath'] = data['Entropy_Afterpath'].fillna(data['Entropy_Afterpath'].mean())
else:
data.dropna(axis='index', inplace=True)
data = data.drop("argPathRatio", axis=1) # simply drop this since it does not affect the scores.
if (show_graph == True):
plt.figure(figsize=(10, 8))
ax = plt.axes()
sns.heatmap(data.isnull(), ax=ax, yticklabels=False, cbar=False, cmap="cividis")
plt.show()
le = LabelEncoder()
data[y_feature] = le.fit_transform(data[y_feature])
if (feature_selection == True):
corr = data.corr()
# Selecting features based on correlation:
# compare the correlation between features and remove one of more features that have a correlation higher than 0.9
# https://towardsdatascience.com/feature-selection-correlation-and-p-value-da8921bfb3cf
columns = np.full((corr.shape[0],), True, dtype=bool)
for i in range(corr.shape[0]):
for j in range(i+1, corr.shape[0]):
if corr.iloc[i,j] >= 0.9:
if columns[j]:
columns[j] = False
selected_columns = data.columns[columns]
data = data[selected_columns]
if (show_graph == True):
corr = data.corr()
plt.figure(figsize=(18,15))
sns.heatmap(corr, annot=True, vmin=-1.0, cmap='cividis')
plt.title('Correlation Heatmap')
plt.show()
scaler = MinMaxScaler()
X = pd.DataFrame(scaler.fit_transform(data.loc[:, data.columns != y_feature]), columns=data.columns[:-1] )
y = data[y_feature]
return X, y
def perform_anomaly_detection(self, X, y):
'''
(X, y) --> X, y
This function perform unsupervised anomaly detection using Isolation Forest.
https://practicaldatascience.co.uk/machine-learning/how-to-use-the-isolation-forest-model-for-outlier-detection
'''
iso_forest = IsolationForest(contamination=0.01, random_state=42).fit(X)
y_pred_iso_forest = iso_forest.predict(X)
X_new, y_new = X[(y_pred_iso_forest != -1)], y[(y_pred_iso_forest != -1)]
print('The shape after unsupervised anomaly detection:')
print(X_new.shape)
print(y_new.shape)
return X_new, y_new
def train_test_split(self, X, y, test_size, random_state, anomaly_detection=True):
'''
This is a convenience method to train test split and have an option to perform anomaly detection or not after the split.
Read more in sklearn.model_selection.train_test_split
Parameters
----------
anomaly_detection: True to perform unsupervised anomaly detection using Isolation Forest.
'''
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
print('The X_train, y_train shape:')
print(X_train.shape)
print(y_train.shape)
if anomaly_detection == True:
X_train, y_train = self.perform_anomaly_detection(X_train, y_train)
print('The X_test, y_test shape:')
print(X_test.shape)
print(y_test.shape)
if anomaly_detection == True:
X_test, y_test = self.perform_anomaly_detection(X_test, y_test)
return X_train, X_test, y_train, y_test
def get_models_to_train(self):
'''
(None) --> dict
This function returns the models to be tested.
'''
RANDOM_STATE=42
MAX_ITER=1000
models = dict()
models['RandomForestClassifier'] = RandomForestClassifier(random_state=RANDOM_STATE)
models['DecisionTreeClassifier'] = DecisionTreeClassifier(random_state=RANDOM_STATE)
models['LogisticRegression'] = LogisticRegression(random_state=RANDOM_STATE, max_iter=MAX_ITER)
models['AdaBoostClassifier'] = AdaBoostClassifier(random_state=RANDOM_STATE)
models['MultinomialNB'] = MultinomialNB()
#drop-out from selection
#models['GradientBoostingClassifier'] = GradientBoostingClassifier(random_state=RANDOM_STATE)
#models['KNeighborsClassifier'] = KNeighborsClassifier()
#models['SupportVectorMachine'] = SVC(random_state=RANDOM_STATE)
return models
def get_parameters_to_train(self, is_best_params=False):
'''
(Boolean) --> dict
This function returns the params to be use for model testing.
Parameters
----------
is_best_params : True to use the already defined best params from previous runs.
False to use all the possible hyperparameters.
Sources:
https://machinelearningmastery.com/hyperparameters-for-classification-machine-learning-algorithms
https://medium.com/swlh/the-hyperparameter-cheat-sheet-770f1fed32ff
https://medium.com/@chaudhurysrijani/tuning-of-adaboost-with-computational-complexity-8727d01a9d20
'''
params_knn = dict()
params_tre = dict()
params_ran = dict()
params_gra = dict()
params_log = dict()
params_svc = dict()
params_ada = dict()
params_mnb = dict()
if is_best_params == False:
params_knn['n_neighbors'] = [2, 4, 6]
params_knn['weights'] = ['uniform','distance']
params_knn['metric'] = ['minkowski','euclidean','manhattan']
params_tre['criterion'] = ['gini', 'entropy']
params_tre['max_depth'] = [1, 3, 5, 10]
params_tre['min_samples_split'] = [5, 10]
params_tre['min_samples_leaf'] = [5, 10]
params_ran['criterion'] = ['gini', 'entropy']
params_ran['n_estimators'] = [100, 150, 200]
params_ran['max_depth'] = [1, 3, 5, 10]
params_ran['min_samples_split'] = [5, 10]
params_ran['min_samples_leaf'] = [5, 10]
params_gra['learning_rate'] = [0.001, 0.01, 0.1]
# params_gra['n_estimators'] = [100, 1000] no significant impact
params_gra['subsample'] = [0.5, 0.7, 1.0]
params_gra['max_depth'] = [3, 7, 9]
# https://www.kaggle.com/code/satishgunjal/multiclass-logistic-regression-using-sklearn/notebook
# Since we are going to use One Vs Rest algorithm, set > multi_class='ovr'
# Note: since we are using One Vs Rest algorithm we must use 'liblinear' solver with it.
params_log['multi_class'] = ['ovr']
params_log['solver'] = ['liblinear']
params_log['penalty'] = ['l2']
params_log['C'] = [100, 10, 1.0, 0.1]
# https://www.baeldung.com/cs/svm-multiclass-classification
#params_svc['kernel'] = ['rbf']
#params_svc['gamma'] = [0.1, 0.5, 1.0]
#params_svc['C'] = [0.01, 0.1]
params_ada['learning_rate'] = [0.01, 0.1, 1.0]
params_ada['algorithm'] = ['SAMME', 'SAMME.R']
params_mnb['alpha']=[0.50, 1.0, 2.0]
params_mnb['class_prior']=[None]
params_mnb['fit_prior']=[True, False]
else:
params_knn['n_neighbors'] = [2]
params_knn['weights'] = ['distance']
params_knn['metric'] = ['manhattan']
params_tre['criterion'] = ['entropy']
params_tre['max_depth'] = [10]
params_tre['min_samples_leaf'] = [5]
params_tre['min_samples_split'] = [5]
params_ran['criterion'] = ['entropy']
params_ran['n_estimators'] = [200]
params_ran['max_depth'] = [10]
params_ran['min_samples_leaf'] = [5]
params_ran['min_samples_split'] = [5]
params_gra['learning_rate'] = [0.1]
params_gra['subsample'] = [0.7]
params_gra['max_depth'] = [9]
params_log['multi_class'] = ['ovr']
params_log['solver'] = ['liblinear']
params_log['penalty'] = ['l2']
params_log['C'] = [100]
# https://www.baeldung.com/cs/svm-multiclass-classification
params_svc['kernel'] = ['rbf']
params_svc['gamma'] = [1.0]
params_svc['C'] = [0.1]
params_ada['learning_rate'] = [1.0]
params_ada['algorithm'] = ['SAMME']
params_mnb['alpha']=[1.0]
params_mnb['class_prior']=[None]
params_mnb['fit_prior']=[True]
params = dict()
params['KNeighborsClassifier'] = params_knn
params['DecisionTreeClassifier'] = params_tre
params['RandomForestClassifier'] = params_ran
params['GradientBoostingClassifier'] = params_gra
params['LogisticRegression'] = params_log
#params['SupportVectorMachine'] = params_svc
params['AdaBoostClassifier'] = params_ada
params['MultinomialNB'] = params_mnb
return params