%pip install -qU tensorflow_addons
%pip install -qU gcsfs
import os
from pathlib import Path
import logging
import json, joblib
from datetime import datetime
from collections import namedtuple
from functools import partial
# Numerical, stats and ML
import pandas as pd
import numpy as np
import dask.array as da
from scipy import signal
from sklearn.model_selection import StratifiedKFold, KFold
import tensorflow as tf
from tensorflow import keras
import tensorflow_addons as tfa
from keras.callbacks import ModelCheckpoint
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'
tf.get_logger().setLevel('WARNING')
tf.config.list_logical_devices()
class SignalFilter:
"""
Cell 33 of https://www.gw-openscience.org/LVT151012data/LOSC_Event_tutorial_LVT151012.html
https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html
"""
def __init__(self, scaling: np.ndarray, filter_type: str='highpass',
filter_order: int=5, f_lo: float=20.43, f_hi: float=None,
sampling_rate: int=2048):
self.scaling = scaling
self.filter_type = filter_type
if filter_type.lower() == 'highpass':
Wn = f_lo
self.filter_window = signal.tukey(4096, 0.1)
self.filter_norm = np.sqrt(1 - f_lo / (sampling_rate / 2))
elif filter_type.lower() == 'bandpass':
Wn = [f_lo, f_hi]
self.filter_window = signal.tukey(4096, 0.1)
self.filter_norm = np.sqrt((f_hi - f_lo) / (sampling_rate / 2))
else:
raise ValueError('Unknown filter type.')
self.filter = signal.butter(N=filter_order, Wn=Wn, btype=filter_type, output='sos', fs=sampling_rate)
def filt(self, X):
X = self.scaling * X * self.filter_window
if self.filter_type.lower() == 'bandpass':
X = signal.sosfilt(self.filter, X)
elif self.filter_type.lower() == 'highpass':
X = signal.sosfiltfilt(self.filter, X)
X *= self.filter_norm
return X
class Data:
"""
Dataset builder.
"""
tfrec_format_train = {
"wave": tf.io.FixedLenFeature([], tf.string),
"target": tf.io.FixedLenFeature([], tf.int64),
"wave_id": tf.io.FixedLenFeature([], tf.string),
}
tfrec_format_test = {
"wave": tf.io.FixedLenFeature([], tf.string),
"wave_id": tf.io.FixedLenFeature([], tf.string)
}
AUTO = tf.data.AUTOTUNE
def __init__(self, config):
self.config = config
# Data file paths
self.train_files = [config.data_path + f'train{i}.tfrecords' for i in range(20)]
self.test_files = [config.data_path + f'test{i}.tfrecords' for i in range(10)]
# Front-end signal filter
self.filter = SignalFilter(**config.filter)
def _preprocess(self, X, y, train_or_test=True):
"""
Preprocess a batch of data: scaling, filtering, transpose. Casting to tf.float32 is done in wrapper.
"""
X = X.numpy()
X = self.filter.filt(X)
X = np.transpose(X, axes=(0, 2, 1))
if train_or_test:
return X, y
else:
return X
def _preprocess_wrapper(self, train_or_test=True):
if train_or_test:
def wrapper(X, y):
return tf.py_function(
self._preprocess,
inp=[X, y], Tout=[tf.float32, tf.float32])
else:
def wrapper(X):
return tf.py_function(
partial(self._preprocess, y=None, train_or_test=False),
inp=[X], Tout=tf.float32)
return wrapper
def _decode_train(self, tfrecord):
tensor_dict = tf.io.parse_single_example(tfrecord, self.tfrec_format_train)
X = tf.reshape(tf.io.decode_raw(tensor_dict['wave'], tf.float64), (3, 4096))
y = tf.reshape(tf.cast(tensor_dict['target'], tf.float32), [1])
# sample_ids = tensor_dict['sample_id']
return X, y
def _decode_test(self, tfrecord):
tensor_dict = tf.io.parse_single_example(tfrecord, self.tfrec_format_test)
X = tf.reshape(tf.io.decode_raw(tensor_dict['wave'], tf.float64), (3, 4096))
return X
def get_dataset(self, train_or_test=True, shuffle=True, file_indices=None):
data_files = self.train_files if train_or_test else self.test_files
if file_indices is not None:
data_files = [data_files[i] for i in file_indices]
ds = tf.data.TFRecordDataset( # do not interleave test data files with parallel reads
data_files, num_parallel_reads=self.AUTO if train_or_test else 1, compression_type="GZIP")
if shuffle:
options = tf.data.Options()
options.experimental_deterministic = False
ds = ds.shuffle(self.config.shuffle_buf_size).with_options(options)
ds = ds.map(self._decode_train if train_or_test else self._decode_test, num_parallel_calls=self.AUTO)
ds = ds.batch(self.config.batch_size).map(self._preprocess_wrapper(train_or_test), num_parallel_calls=self.AUTO)
ds = ds.prefetch(self.AUTO)
return ds
class GeM(keras.layers.Layer):
def __init__(self, pool_size, p=3, eps=1e-6, **kwargs):
super(GeM, self).__init__(**kwargs)
self.pool_size = pool_size
self.p = p
self.eps = eps
def call(self, x):
x = tf.math.maximum(x, self.eps)
x = tf.pow(x, self.p)
x = tf.nn.avg_pool(x, self.pool_size, self.pool_size, 'VALID')
x = tf.pow(x, 1./self.p)
return x
def get_model():
"""
Modified from
https://journals.aps.org/prl/pdf/1check0.1103/PhysRevLett.120.141103
"""
model = keras.models.Sequential([
keras.layers.Conv1D(64, 64, padding='valid', input_shape=(4096, 3)),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Conv1D(64, 32, padding='valid'),
GeM(pool_size=8),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Conv1D(128, 32, padding='valid'),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Conv1D(128, 16, padding='valid'),
GeM(pool_size=6),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Conv1D(256, 16, padding='valid'),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Conv1D(256, 16, padding='valid'),
GeM(pool_size=4),
keras.layers.BatchNormalization(),
keras.layers.Activation(tf.nn.silu),
keras.layers.Flatten(),
# keras.layers.GlobalAveragePooling1D(),
keras.layers.Dense(64),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.25),
keras.layers.Activation(tf.nn.silu),
keras.layers.Dense(64),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.25),
keras.layers.Activation(tf.nn.silu),
keras.layers.Dense(1, activation='sigmoid')
])
return model
def seed_all(seed=42):
np.random.seed(seed)
tf.random.set_seed(seed)
def get_logger(
logger_name,
log_path=None,
file_level=logging.INFO,
stream_level=logging.INFO,
):
if log_path is None and stream_level is None:
raise ValueError("Both file and stream logger is None.")
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
logger_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# Add file handler
if log_path is not None and Path(log_path).expanduser().resolve().exists():
logger_file = (
Path(log_path).expanduser().resolve().joinpath(logger_name + ".log")
)
if logger_file.exists():
logger_file.unlink()
fh = logging.FileHandler(logger_file)
fh.setLevel(file_level)
fh.setFormatter(logger_format)
logger.addHandler(fh)
# Add stream handler
if stream_level is not None:
sh = logging.StreamHandler()
sh.setLevel(stream_level)
sh.setFormatter(logger_format)
logger.addHandler(sh)
return logger
class Config(dict):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self.__dict__ = self
def check_save_config(config: Config):
config.results_path = str(Path(config.results_parent_path).joinpath(config.name))
p = Path(config.results_path).expanduser().resolve()
try:
p.mkdir(parents=True, exist_ok=False)
except FileExistsError:
if any(p.iterdir()):
raise ValueError("Non-empty results directory.")
joblib.dump(config, p.joinpath('config.pkl'))
attrs = {k: v for k, v in vars(config).items() if not k.startswith('__')}
with open(p.joinpath('config.json'), 'w') as f: # Human readable JSON
try:
attrs['filter']['scaling'] = attrs['filter']['scaling'].tolist()
except AttributeError:
pass
json.dump(attrs, f)
def train(config: Config, train_data, val_data, fold_k=None):
"""
Train a single fold of training dataset.
"""
checkpoint_path = Path(config.results_path).joinpath(
'checkpoint' + ('' if fold_k is None else f"_fold{fold_k}" ))
step = tf.Variable(0, trainable=False)
lr_schedule_class = getattr(tf.keras.optimizers.schedules, config.lr_schedule)
lr_schedule = lr_schedule_class(**config.lr_schedule_paras[config.lr_schedule])
optimizer_class = getattr(tfa.optimizers, config.optimizer)
optimizer = optimizer_class(learning_rate=1*lr_schedule(step), **config.optimizer_paras[config.optimizer])
# weight_decay = lambda: cfg.optimizer['weight_decay'] * lr_schedule(step))
model = get_model()
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=[keras.metrics.AUC(name='AUC')])
checkpoint_cb = ModelCheckpoint(checkpoint_path, monitor='val_AUC', verbose=0, save_best_only=True, mode='max')
history = model.fit(train_data, epochs=config.epochs, callbacks=[checkpoint_cb], validation_data=val_data)
pd.DataFrame(history.history).to_csv(checkpoint_path.joinpath('train_history.csv'))
# return best model and the checkpoint path
model = keras.models.load_model(checkpoint_path)
return model, checkpoint_path
def get_score(y_true, y_pred):
"""
Compute ROC AUC score on tensors.
"""
auc = tf.keras.metrics.AUC()
auc.update_state(y_true, y_pred)
score = auc.result().numpy()
return score
def oof_predict(model, val_data):
"""
Predict on the validation dataset of a single fold.
"""
val_y_true = []
val_y_pred = []
for X, y in val_data:
val_y_true.append(y)
val_y_pred.append(model(X))
val_y_true = tf.concat(val_y_true, axis=0)
val_y_pred = tf.concat(val_y_pred, axis=0)
val_score = get_score(val_y_true, val_y_pred)
return val_y_true, val_y_pred, val_score
def make_inference(config, models, logger=None):
"""
Make inference on test dataset and create submission file.
"""
logger_func = logger.info if logger else print
data = Data(config)
test_data = data.get_dataset(train_or_test=False, shuffle=False)
test_preds = []
for k, model in zip(config.train_folds or range(config.K_fold), models):
logger_func(f"Make inference by fold {k} model")
y_pred = model.predict(test_data)
test_preds.append(y_pred)
test_preds = np.concatenate(test_preds, axis=1).mean(axis=1)
sample = pd.read_csv(config.data_path + 'sample_submission.csv')
test_preds_df = pd.DataFrame({'id': sample['id'].values, 'target': test_preds})
p_submission = Path(config.results_path).joinpath(f'{config.name}_submission.csv')
test_preds_df.to_csv(p_submission, index=False)
logger_func(f'Test inference written to {p_submission}.')
return test_preds_df
def train_K_folds_make_inference(config):
"""
Train K folds of the training dataset with out-of-fold validation.
"""
try:
check_save_config(config)
except FileExistsError:
print(f"Results path: {config.results_path} exists and not empty. Quit.")
return
logger = get_logger(config.name, log_path=config.results_path)
logger.info(f"{config.description}")
logger.info(f"Results path: {config.results_path}")
data = Data(config)
kf = KFold(n_splits=config.K_fold, shuffle=True, random_state=config.seed)
oof_models, oof_model_paths, oof_labels, oof_preds, k_fold_scores = [], [], [], [], []
for k, (train_idx, val_idx) in enumerate(kf.split(data.train_files)):
if config.train_folds and k not in config.train_folds:
continue
logger.info(f"--- Train fold {k} of {config.train_folds} ---")
logger.info(f"Train: {train_idx} Val: {val_idx}")
train_data = data.get_dataset(train_or_test=True, shuffle=True, file_indices=train_idx)
val_data = data.get_dataset(train_or_test=True, shuffle=False, file_indices=val_idx)
model, model_path = train(config, train_data, val_data, fold_k=k)
oof_models.append(model)
oof_model_paths.append(model_path)
# oof prediction
val_y_true, val_y_pred, val_score = oof_predict(model, val_data)
oof_labels.append(val_y_true)
oof_preds.append(val_y_pred)
k_fold_scores.append(val_score)
logger.info(f"Fold {k} val score: {val_score}")
oof_labels = tf.concat(oof_labels, axis=0)
oof_preds = tf.concat(oof_preds, axis=0)
oof_score = get_score(oof_labels, oof_preds)
logger.info(f"OOF val score: {oof_score}")
logger.info("--- Inference ---")
test_preds_df = make_inference(config, oof_models, logger)
return oof_models, oof_model_paths, oof_score, k_fold_scores, test_preds_df
def load_results_make_inference(results_path: str):
"""
Load saved models and make inference on test dataset.
"""
p = Path(results_path).expanduser().resolve()
if not p.exists():
print(f"Invalid results_path: {results_path}")
return
config = joblib.load(p.joinpath('config.pkl'))
models = []
for k in config.train_folds or range(config.K_fold):
checkpoint_k = p.joinpath(f'checkpoint_fold{k}')
model = keras.models.load_model(checkpoint_k)
models.append(model)
test_preds_df = make_inference(config, models)
return test_preds_df
config = Config(
name = "CNN1d_GeM_SGDW_Highpass_Tukey_" + datetime.now().strftime("%Y-%m-%d_%H-%M"),
description = "CNN1d GeM SGDW CosineDecay, 5 folds, 20 epochs, batch_size 128. Highpass signal filter.",
# paths
# data_path = "data/", # Dataset on local drive
data_path = "/content/drive/Shareddrives/ml/g2net/data/", # Dataset on Google Drive
# data_path = "gs://kds-8a5a5ceed201023b7b0d1880950ccc33c21b9bef067a7abe0dfb4aaa/", # Dataset on Kaggle GCS
# results_path = "results/", # Save results on local drive
results_parent_path = "/content/drive/Shareddrives/ml/g2net/results/", # Save results on Google Drive
results_path = None, # results_parent_path + name, to be initialized in check_and_save_config().
shuffle_buf_size = 2048,
# train/test paras
K_fold = 5,
train_folds = [], # If not empty, only train a subset of folds.
batch_size = 128,
epochs = 20,
# warm start
warm_start = False,
warm_start_model_path = "",
# algorithm/model paras
filter = dict(
scaling = 1e20, # 1 / np.array([1.5e-20, 1.5e-20, 0.5e-20]).reshape(-1, 1),
#filter_type = 'bandpass', filter_order=8, f_lo=25, f_hi=1000, sampling_rate=2048
filter_type='highpass', filter_order=5, f_lo=20.43, f_hi=None, sampling_rate=2048
),
lr_schedule = 'CosineDecay',
lr_schedule_paras = {'CosineDecay': dict(initial_learning_rate=1e-1, decay_steps=5, alpha=1e-6)}, # init_lr=eta_max alpha= eta_min/eta_max
optimizer = 'SGDW',
optimizer_paras = {
'SGDW': dict(weight_decay=1e-4, momentum=0.9, nesterov=True),
'AdamW': dict(weight_decay=1e-5),
},
# misc
seed = 42,
)
oof_models, oof_model_paths, oof_score, k_fold_scores, test_preds_df = train_K_folds_make_inference(config)
Private Score: 0.87249 Public score: 0.87414
#results_path = "/content/drive/Shareddrives/ml/g2net/results/CNN1d_GeM_SGDW_Highpass_Tukey"
#test_preds_df = load_results_make_inference(results_path)