%pip install -qU tensorflow_addons %pip install -qU gcsfs import os from pathlib import Path import logging import json, joblib from datetime import datetime from collections import namedtuple from functools import partial # Numerical, stats and ML import pandas as pd import numpy as np import dask.array as da from scipy import signal from sklearn.model_selection import StratifiedKFold, KFold import tensorflow as tf from tensorflow import keras import tensorflow_addons as tfa from keras.callbacks import ModelCheckpoint os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1' tf.get_logger().setLevel('WARNING') tf.config.list_logical_devices() class SignalFilter: """ Cell 33 of https://www.gw-openscience.org/LVT151012data/LOSC_Event_tutorial_LVT151012.html https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html """ def __init__(self, scaling: np.ndarray, filter_type: str='highpass', filter_order: int=5, f_lo: float=20.43, f_hi: float=None, sampling_rate: int=2048): self.scaling = scaling self.filter_type = filter_type if filter_type.lower() == 'highpass': Wn = f_lo self.filter_window = signal.tukey(4096, 0.1) self.filter_norm = np.sqrt(1 - f_lo / (sampling_rate / 2)) elif filter_type.lower() == 'bandpass': Wn = [f_lo, f_hi] self.filter_window = signal.tukey(4096, 0.1) self.filter_norm = np.sqrt((f_hi - f_lo) / (sampling_rate / 2)) else: raise ValueError('Unknown filter type.') self.filter = signal.butter(N=filter_order, Wn=Wn, btype=filter_type, output='sos', fs=sampling_rate) def filt(self, X): X = self.scaling * X * self.filter_window if self.filter_type.lower() == 'bandpass': X = signal.sosfilt(self.filter, X) elif self.filter_type.lower() == 'highpass': X = signal.sosfiltfilt(self.filter, X) X *= self.filter_norm return X class Data: """ Dataset builder. """ tfrec_format_train = { "wave": tf.io.FixedLenFeature([], tf.string), "target": tf.io.FixedLenFeature([], tf.int64), "wave_id": tf.io.FixedLenFeature([], tf.string), } tfrec_format_test = { "wave": tf.io.FixedLenFeature([], tf.string), "wave_id": tf.io.FixedLenFeature([], tf.string) } AUTO = tf.data.AUTOTUNE def __init__(self, config): self.config = config # Data file paths self.train_files = [config.data_path + f'train{i}.tfrecords' for i in range(20)] self.test_files = [config.data_path + f'test{i}.tfrecords' for i in range(10)] # Front-end signal filter self.filter = SignalFilter(**config.filter) def _preprocess(self, X, y, train_or_test=True): """ Preprocess a batch of data: scaling, filtering, transpose. Casting to tf.float32 is done in wrapper. """ X = X.numpy() X = self.filter.filt(X) X = np.transpose(X, axes=(0, 2, 1)) if train_or_test: return X, y else: return X def _preprocess_wrapper(self, train_or_test=True): if train_or_test: def wrapper(X, y): return tf.py_function( self._preprocess, inp=[X, y], Tout=[tf.float32, tf.float32]) else: def wrapper(X): return tf.py_function( partial(self._preprocess, y=None, train_or_test=False), inp=[X], Tout=tf.float32) return wrapper def _decode_train(self, tfrecord): tensor_dict = tf.io.parse_single_example(tfrecord, self.tfrec_format_train) X = tf.reshape(tf.io.decode_raw(tensor_dict['wave'], tf.float64), (3, 4096)) y = tf.reshape(tf.cast(tensor_dict['target'], tf.float32), [1]) # sample_ids = tensor_dict['sample_id'] return X, y def _decode_test(self, tfrecord): tensor_dict = tf.io.parse_single_example(tfrecord, self.tfrec_format_test) X = tf.reshape(tf.io.decode_raw(tensor_dict['wave'], tf.float64), (3, 4096)) return X def get_dataset(self, train_or_test=True, shuffle=True, file_indices=None): data_files = self.train_files if train_or_test else self.test_files if file_indices is not None: data_files = [data_files[i] for i in file_indices] ds = tf.data.TFRecordDataset( # do not interleave test data files with parallel reads data_files, num_parallel_reads=self.AUTO if train_or_test else 1, compression_type="GZIP") if shuffle: options = tf.data.Options() options.experimental_deterministic = False ds = ds.shuffle(self.config.shuffle_buf_size).with_options(options) ds = ds.map(self._decode_train if train_or_test else self._decode_test, num_parallel_calls=self.AUTO) ds = ds.batch(self.config.batch_size).map(self._preprocess_wrapper(train_or_test), num_parallel_calls=self.AUTO) ds = ds.prefetch(self.AUTO) return ds class GeM(keras.layers.Layer): def __init__(self, pool_size, p=3, eps=1e-6, **kwargs): super(GeM, self).__init__(**kwargs) self.pool_size = pool_size self.p = p self.eps = eps def call(self, x): x = tf.math.maximum(x, self.eps) x = tf.pow(x, self.p) x = tf.nn.avg_pool(x, self.pool_size, self.pool_size, 'VALID') x = tf.pow(x, 1./self.p) return x def get_model(): """ Modified from https://journals.aps.org/prl/pdf/1check0.1103/PhysRevLett.120.141103 """ model = keras.models.Sequential([ keras.layers.Conv1D(64, 64, padding='valid', input_shape=(4096, 3)), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Conv1D(64, 32, padding='valid'), GeM(pool_size=8), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Conv1D(128, 32, padding='valid'), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Conv1D(128, 16, padding='valid'), GeM(pool_size=6), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Conv1D(256, 16, padding='valid'), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Conv1D(256, 16, padding='valid'), GeM(pool_size=4), keras.layers.BatchNormalization(), keras.layers.Activation(tf.nn.silu), keras.layers.Flatten(), # keras.layers.GlobalAveragePooling1D(), keras.layers.Dense(64), keras.layers.BatchNormalization(), keras.layers.Dropout(0.25), keras.layers.Activation(tf.nn.silu), keras.layers.Dense(64), keras.layers.BatchNormalization(), keras.layers.Dropout(0.25), keras.layers.Activation(tf.nn.silu), keras.layers.Dense(1, activation='sigmoid') ]) return model def seed_all(seed=42): np.random.seed(seed) tf.random.set_seed(seed) def get_logger( logger_name, log_path=None, file_level=logging.INFO, stream_level=logging.INFO, ): if log_path is None and stream_level is None: raise ValueError("Both file and stream logger is None.") logger = logging.getLogger(logger_name) logger.setLevel(logging.INFO) logger_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # Add file handler if log_path is not None and Path(log_path).expanduser().resolve().exists(): logger_file = ( Path(log_path).expanduser().resolve().joinpath(logger_name + ".log") ) if logger_file.exists(): logger_file.unlink() fh = logging.FileHandler(logger_file) fh.setLevel(file_level) fh.setFormatter(logger_format) logger.addHandler(fh) # Add stream handler if stream_level is not None: sh = logging.StreamHandler() sh.setLevel(stream_level) sh.setFormatter(logger_format) logger.addHandler(sh) return logger class Config(dict): def __init__(self, *args, **kwargs): super().__init__(**kwargs) self.__dict__ = self def check_save_config(config: Config): config.results_path = str(Path(config.results_parent_path).joinpath(config.name)) p = Path(config.results_path).expanduser().resolve() try: p.mkdir(parents=True, exist_ok=False) except FileExistsError: if any(p.iterdir()): raise ValueError("Non-empty results directory.") joblib.dump(config, p.joinpath('config.pkl')) attrs = {k: v for k, v in vars(config).items() if not k.startswith('__')} with open(p.joinpath('config.json'), 'w') as f: # Human readable JSON try: attrs['filter']['scaling'] = attrs['filter']['scaling'].tolist() except AttributeError: pass json.dump(attrs, f) def train(config: Config, train_data, val_data, fold_k=None): """ Train a single fold of training dataset. """ checkpoint_path = Path(config.results_path).joinpath( 'checkpoint' + ('' if fold_k is None else f"_fold{fold_k}" )) step = tf.Variable(0, trainable=False) lr_schedule_class = getattr(tf.keras.optimizers.schedules, config.lr_schedule) lr_schedule = lr_schedule_class(**config.lr_schedule_paras[config.lr_schedule]) optimizer_class = getattr(tfa.optimizers, config.optimizer) optimizer = optimizer_class(learning_rate=1*lr_schedule(step), **config.optimizer_paras[config.optimizer]) # weight_decay = lambda: cfg.optimizer['weight_decay'] * lr_schedule(step)) model = get_model() model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=[keras.metrics.AUC(name='AUC')]) checkpoint_cb = ModelCheckpoint(checkpoint_path, monitor='val_AUC', verbose=0, save_best_only=True, mode='max') history = model.fit(train_data, epochs=config.epochs, callbacks=[checkpoint_cb], validation_data=val_data) pd.DataFrame(history.history).to_csv(checkpoint_path.joinpath('train_history.csv')) # return best model and the checkpoint path model = keras.models.load_model(checkpoint_path) return model, checkpoint_path def get_score(y_true, y_pred): """ Compute ROC AUC score on tensors. """ auc = tf.keras.metrics.AUC() auc.update_state(y_true, y_pred) score = auc.result().numpy() return score def oof_predict(model, val_data): """ Predict on the validation dataset of a single fold. """ val_y_true = [] val_y_pred = [] for X, y in val_data: val_y_true.append(y) val_y_pred.append(model(X)) val_y_true = tf.concat(val_y_true, axis=0) val_y_pred = tf.concat(val_y_pred, axis=0) val_score = get_score(val_y_true, val_y_pred) return val_y_true, val_y_pred, val_score def make_inference(config, models, logger=None): """ Make inference on test dataset and create submission file. """ logger_func = logger.info if logger else print data = Data(config) test_data = data.get_dataset(train_or_test=False, shuffle=False) test_preds = [] for k, model in zip(config.train_folds or range(config.K_fold), models): logger_func(f"Make inference by fold {k} model") y_pred = model.predict(test_data) test_preds.append(y_pred) test_preds = np.concatenate(test_preds, axis=1).mean(axis=1) sample = pd.read_csv(config.data_path + 'sample_submission.csv') test_preds_df = pd.DataFrame({'id': sample['id'].values, 'target': test_preds}) p_submission = Path(config.results_path).joinpath(f'{config.name}_submission.csv') test_preds_df.to_csv(p_submission, index=False) logger_func(f'Test inference written to {p_submission}.') return test_preds_df def train_K_folds_make_inference(config): """ Train K folds of the training dataset with out-of-fold validation. """ try: check_save_config(config) except FileExistsError: print(f"Results path: {config.results_path} exists and not empty. Quit.") return logger = get_logger(config.name, log_path=config.results_path) logger.info(f"{config.description}") logger.info(f"Results path: {config.results_path}") data = Data(config) kf = KFold(n_splits=config.K_fold, shuffle=True, random_state=config.seed) oof_models, oof_model_paths, oof_labels, oof_preds, k_fold_scores = [], [], [], [], [] for k, (train_idx, val_idx) in enumerate(kf.split(data.train_files)): if config.train_folds and k not in config.train_folds: continue logger.info(f"--- Train fold {k} of {config.train_folds} ---") logger.info(f"Train: {train_idx} Val: {val_idx}") train_data = data.get_dataset(train_or_test=True, shuffle=True, file_indices=train_idx) val_data = data.get_dataset(train_or_test=True, shuffle=False, file_indices=val_idx) model, model_path = train(config, train_data, val_data, fold_k=k) oof_models.append(model) oof_model_paths.append(model_path) # oof prediction val_y_true, val_y_pred, val_score = oof_predict(model, val_data) oof_labels.append(val_y_true) oof_preds.append(val_y_pred) k_fold_scores.append(val_score) logger.info(f"Fold {k} val score: {val_score}") oof_labels = tf.concat(oof_labels, axis=0) oof_preds = tf.concat(oof_preds, axis=0) oof_score = get_score(oof_labels, oof_preds) logger.info(f"OOF val score: {oof_score}") logger.info("--- Inference ---") test_preds_df = make_inference(config, oof_models, logger) return oof_models, oof_model_paths, oof_score, k_fold_scores, test_preds_df def load_results_make_inference(results_path: str): """ Load saved models and make inference on test dataset. """ p = Path(results_path).expanduser().resolve() if not p.exists(): print(f"Invalid results_path: {results_path}") return config = joblib.load(p.joinpath('config.pkl')) models = [] for k in config.train_folds or range(config.K_fold): checkpoint_k = p.joinpath(f'checkpoint_fold{k}') model = keras.models.load_model(checkpoint_k) models.append(model) test_preds_df = make_inference(config, models) return test_preds_df config = Config( name = "CNN1d_GeM_SGDW_Highpass_Tukey_" + datetime.now().strftime("%Y-%m-%d_%H-%M"), description = "CNN1d GeM SGDW CosineDecay, 5 folds, 20 epochs, batch_size 128. Highpass signal filter.", # paths # data_path = "data/", # Dataset on local drive data_path = "/content/drive/Shareddrives/ml/g2net/data/", # Dataset on Google Drive # data_path = "gs://kds-8a5a5ceed201023b7b0d1880950ccc33c21b9bef067a7abe0dfb4aaa/", # Dataset on Kaggle GCS # results_path = "results/", # Save results on local drive results_parent_path = "/content/drive/Shareddrives/ml/g2net/results/", # Save results on Google Drive results_path = None, # results_parent_path + name, to be initialized in check_and_save_config(). shuffle_buf_size = 2048, # train/test paras K_fold = 5, train_folds = [], # If not empty, only train a subset of folds. batch_size = 128, epochs = 20, # warm start warm_start = False, warm_start_model_path = "", # algorithm/model paras filter = dict( scaling = 1e20, # 1 / np.array([1.5e-20, 1.5e-20, 0.5e-20]).reshape(-1, 1), #filter_type = 'bandpass', filter_order=8, f_lo=25, f_hi=1000, sampling_rate=2048 filter_type='highpass', filter_order=5, f_lo=20.43, f_hi=None, sampling_rate=2048 ), lr_schedule = 'CosineDecay', lr_schedule_paras = {'CosineDecay': dict(initial_learning_rate=1e-1, decay_steps=5, alpha=1e-6)}, # init_lr=eta_max alpha= eta_min/eta_max optimizer = 'SGDW', optimizer_paras = { 'SGDW': dict(weight_decay=1e-4, momentum=0.9, nesterov=True), 'AdamW': dict(weight_decay=1e-5), }, # misc seed = 42, ) oof_models, oof_model_paths, oof_score, k_fold_scores, test_preds_df = train_K_folds_make_inference(config) #results_path = "/content/drive/Shareddrives/ml/g2net/results/CNN1d_GeM_SGDW_Highpass_Tukey" #test_preds_df = load_results_make_inference(results_path)