#!/usr/bin/env python # coding: utf-8 # # Time Series Processing Example # The goal of the present tutorial is to show how usage of the Neuraxle framework can make a difference in term of clean pipeline design through an example of time series processing. # # # ### The Dataset # # It'll be downloaded automatically for you in the code below. # # We're using a Human Activity Recognition (HAR) dataset captured using smartphones. The [dataset](https://archive.ics.uci.edu/ml/datasets/Human+Activity+Recognition+Using+Smartphones) can be found on the UCI Machine Learning Repository. # # ### The task # # Classify the type of movement amongst six categories from the phones' sensor data: # # - WALKING, # - WALKING_UPSTAIRS, # - WALKING_DOWNSTAIRS, # - SITTING, # - STANDING, # - LAYING. # # ### Video dataset overview # # Follow this link to see a video of the 6 activities recorded in the experiment with one of the participants: # #

# Video of the experiment #

[Watch video]
#

# # ### Details about the input data # # The dataset's description goes like this: # # > The sensor signals (accelerometer and gyroscope) were pre-processed by applying noise filters and then sampled in fixed-width sliding windows of 2.56 sec and 50% overlap (128 readings/window). The sensor acceleration signal, which has gravitational and body motion components, was separated using a Butterworth low-pass filter into body acceleration and gravity. The gravitational force is assumed to have only low frequency components, therefore a filter with 0.3 Hz cutoff frequency was used. # # Reference: # # > Davide Anguita, Alessandro Ghio, Luca Oneto, Xavier Parra and Jorge L. Reyes-Ortiz. A Public Domain Dataset for Human Activity Recognition Using Smartphones. 21th European Symposium on Artificial Neural Networks, Computational Intelligence and Machine Learning, ESANN 2013. Bruges, Belgium 24-26 April 2013. # # That said, I will use the almost raw data: only the gravity effect has been filtered out of the accelerometer as a preprocessing step for another 3D feature as an input to help learning. If you'd ever want to extract the gravity by yourself, you could use the following [Butterworth Low-Pass Filter (LPF)](https://github.com/guillaume-chevalier/filtering-stft-and-laplace-transform) and edit it to have the right cutoff frequency of 0.3 Hz which is a good frequency for activity recognition from body sensors. # # Here is how the 3D data cube looks like. So we'll have a train and a test data cube, and might create validation data cubes as well: # # ![](images/time-series-data.jpg) # # So we have 3D data of shape `[batch_size, time_steps, features]`. If this and the above is still unclear to you, you may want to [learn more on the 3D shape of time series data](https://www.quora.com/What-do-samples-features-time-steps-mean-in-LSTM/answers/79038267). # # ### Loading the Dataset # This first part downloads and load the dataset # In[1]: import urllib import os from subprocess import call def download_dataset_if_needed(): print("Downloading...") if not os.path.exists("UCI HAR Dataset.zip"): call( 'wget "https://archive.ics.uci.edu/ml/machine-learning-databases/00240/UCI HAR Dataset.zip"', shell=True ) print("Downloading done.\n") else: print("Dataset already downloaded. Did not download twice.\n") print("Extracting...") extract_directory = os.path.abspath("UCI HAR Dataset") if not os.path.exists(extract_directory): call( 'unzip -nq "UCI HAR Dataset.zip"', shell=True ) print("Extracting successfully done to {}.".format(extract_directory)) else: print("Dataset already extracted. Did not extract twice.\n") DATA_PATH = "data/" if not os.path.exists(DATA_PATH): os.mkdir(DATA_PATH) os.chdir(DATA_PATH) download_dataset_if_needed() os.chdir("..") DATASET_PATH = DATA_PATH + "UCI HAR Dataset/" print("\n" + "Dataset is now located at: " + DATASET_PATH) # In[2]: try: import neuraxle assert neuraxle.__version__ == '0.6.0' except: get_ipython().system('pip install neuraxle==0.6.0') # In[3]: import numpy as np # Useful Constants DATA_PATH = "data/" DATASET_PATH = DATA_PATH + "UCI HAR Dataset/" # Those are separate normalised input features for the neural network INPUT_SIGNAL_TYPES = [ "body_acc_x_", "body_acc_y_", "body_acc_z_", "body_gyro_x_", "body_gyro_y_", "body_gyro_z_", "total_acc_x_", "total_acc_y_", "total_acc_z_" ] # Output classes to learn how to classify LABELS = [ "WALKING", "WALKING_UPSTAIRS", "WALKING_DOWNSTAIRS", "SITTING", "STANDING", "LAYING" ] TRAIN = "train/" TEST = "test/" def load_all_data(): # Load "X" (the neural network's training and testing inputs) def load_X(X_signals_paths): X_signals = [] for signal_type_path in X_signals_paths: file = open(signal_type_path, 'r') # Read dataset from disk, dealing with text files' syntax X_signals.append( [np.array(serie, dtype=np.float32) for serie in [ row.replace(' ', ' ').strip().split(' ') for row in file ]] ) file.close() return np.transpose(np.array(X_signals), (1, 2, 0)) X_train_signals_paths = [ DATASET_PATH + TRAIN + "Inertial Signals/" + signal + "train.txt" for signal in INPUT_SIGNAL_TYPES ] X_test_signals_paths = [ DATASET_PATH + TEST + "Inertial Signals/" + signal + "test.txt" for signal in INPUT_SIGNAL_TYPES ] X_train = load_X(X_train_signals_paths) X_test = load_X(X_test_signals_paths) def load_y(y_path): file = open(y_path, 'r') # Read dataset from disk, dealing with text file's syntax y_ = np.array( [elem for elem in [ row.replace(' ', ' ').strip().split(' ') for row in file ]], dtype=np.int32 ) file.close() # Substract 1 to each output class for friendly 0-based indexing return y_ - 1 y_train_path = DATASET_PATH + TRAIN + "y_train.txt" y_test_path = DATASET_PATH + TEST + "y_test.txt" y_train = load_y(y_train_path) y_test = load_y(y_test_path) print("Some useful info to get an insight on dataset's shape and normalisation:") print("(X shape, y shape, every X's mean, every X's standard deviation)") print(X_test.shape, y_test.shape, np.mean(X_test), np.std(X_test)) return X_train, y_train, X_test, y_test # In[4]: # Finally load dataset! X_train, y_train, X_test, y_test = load_all_data() print("Dataset loaded!") # ### Part 1 - How would you code this in a typical ML project using Scikit-learn # # We'll first define functions that will extract features from the 3d data. # In[5]: import numpy as np from sklearn.metrics import accuracy_score from sklearn.tree import DecisionTreeClassifier def get_fft_peak_infos(real_fft, time_bins_axis=-2): """ Extract the indices of the bins with maximal amplitude, and the corresponding amplitude values. :param fft: real magnitudes of an fft. It could be of shape [N, bins, features]. :param time_bins_axis: axis of the frequency bins (e.g.: time axis before fft). :return: Two arrays without bins. One is an int, the other is a float. Shape: ([N, features], [N, features]) """ peak_bin = np.argmax(real_fft, axis=time_bins_axis) peak_bin_val = np.max(real_fft, axis=time_bins_axis) return peak_bin, peak_bin_val def fft_magnitudes(data_inputs, time_axis=-2): """ Apply a Fast Fourier Transform operation to analyze frequencies, and return real magnitudes. The bins past the half (past the nyquist frequency) are discarded, which result in shorter time series. :param data_inputs: ND array of dimension at least 1. For instance, this could be of shape [N, time_axis, features] :param time_axis: axis along which the time series evolve :return: real magnitudes of the data_inputs. For instance, this could be of shape [N, (time_axis / 2) + 1, features] so here, we have `bins = (time_axis / 2) + 1`. """ fft = np.fft.rfft(data_inputs, axis=time_axis) real_fft = np.absolute(fft) return real_fft def get_fft_features(x_data): """ Will featurize data with an FFT. :param x_data: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series with FFT of shape [batch_size, features] """ real_fft = fft_magnitudes(x_data) flattened_fft = real_fft.reshape(real_fft.shape[0], -1) peak_bin, peak_bin_val = get_fft_peak_infos(real_fft) return flattened_fft, peak_bin, peak_bin_val def featurize_data(x_data): """ Will convert 3D time series of shape [batch_size, time_steps, sensors] to shape [batch_size, features] to prepare data for machine learning. :param x_data: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ print("Input shape before feature union:", x_data.shape) flattened_fft, peak_bin, peak_bin_val = get_fft_features(x_data) mean = np.mean(x_data, axis=-2) median = np.median(x_data, axis=-2) min = np.min(x_data, axis=-2) max = np.max(x_data, axis=-2) featurized_data = np.concatenate([ flattened_fft, peak_bin, peak_bin_val, mean, median, min, max, ], axis=-1) print("Shape after feature union, before classification:", featurized_data.shape) return featurized_data # In[6]: # Shape: [batch_size, time_steps, sensor_features] X_train_featurized = featurize_data(X_train) # Shape: [batch_size, remade_features] classifier = DecisionTreeClassifier() classifier.fit(X_train_featurized, y_train) # In[7]: # Shape: [batch_size, time_steps, sensor_features] X_test_featurized = featurize_data(X_test) # Shape: [batch_size, remade_features] y_pred = classifier.predict(X_test_featurized) print("Shape at output after classification:", y_pred.shape) # Shape: [batch_size] accuracy = accuracy_score(y_pred=y_pred, y_true=y_test) print("Accuracy of sklearn pipeline code:", accuracy) # ### Part 2 - How to code a similar pipeline - but cleaner - using Neuraxle # # To make ourselves a cleaner pipeline, we'll define each of our transformation as steps. Defining our pipeline in term of steps allows us to implement separately the behaviour on *fit* calls and on *transform* calls. In our present case though, we only need to define a *transform* behaviour. # In[8]: from neuraxle.base import BaseStep, NonFittableMixin from neuraxle.steps.numpy import NumpyConcatenateInnerFeatures, NumpyShapePrinter, NumpyFlattenDatum, NumpyRavel class NumpyStep(NonFittableMixin, BaseStep): def __init__(self): BaseStep.__init__(self) NonFittableMixin.__init__(self) class NumpyFFT(NumpyStep): def transform(self, data_inputs): """ Featurize time series data with FFT. :param data_inputs: time series data of 3D shape: [batch_size, time_steps, sensors_readings] :return: featurized data is of 2D shape: [batch_size, n_features] """ transformed_data = np.fft.rfft(data_inputs, axis=-2) return transformed_data class FFTPeakBinWithValue(NumpyStep): def transform(self, data_inputs): """ Will compute peak fft bins (int), and their magnitudes' value (float), to concatenate them. :param data_inputs: real magnitudes of an fft. It could be of shape [batch_size, bins, features]. :return: Two arrays without bins concatenated on feature axis. Shape: [batch_size, 2 * features] """ time_bins_axis = -2 peak_bin = np.argmax(data_inputs, axis=time_bins_axis) peak_bin_val = np.max(data_inputs, axis=time_bins_axis) # Notice that here another FeatureUnion could have been used with a joiner: transformed = np.concatenate([peak_bin, peak_bin_val], axis=-1) return transformed class NumpyAbs(NumpyStep): def transform(self, data_inputs): """ Will featurize data with a max. :param data_inputs: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ return np.abs(data_inputs) class NumpyMean(NumpyStep): def transform(self, data_inputs): """ Will featurize data with a mean. :param data_inputs: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ return np.mean(data_inputs, axis=-2) class NumpyMedian(NumpyStep): def transform(self, data_inputs): """ Will featurize data with a median. :param data_inputs: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ return np.median(data_inputs, axis=-2) class NumpyMin(NumpyStep): def transform(self, data_inputs): """ Will featurize data with a min. :param data_inputs: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ return np.min(data_inputs, axis=-2) class NumpyMax(NumpyStep): def transform(self, data_inputs): """ Will featurize data with a max. :param data_inputs: 3D time series of shape [batch_size, time_steps, sensors] :return: featurized time series of shape [batch_size, features] """ return np.max(data_inputs, axis=-2) # Next, we'll define a set of classifier we'd like to test and their respective hyperparameter space we'd like to explore. # In[9]: from neuraxle.hyperparams.distributions import Choice, Boolean from neuraxle.hyperparams.distributions import RandInt, LogUniform from neuraxle.hyperparams.space import HyperparameterSpace from neuraxle.pipeline import Pipeline from neuraxle.steps.output_handlers import OutputTransformerWrapper from neuraxle.steps.sklearn import SKLearnWrapper from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import RidgeClassifier, LogisticRegression from sklearn.tree import DecisionTreeClassifier, ExtraTreeClassifier decision_tree_classifier = SKLearnWrapper( DecisionTreeClassifier(), HyperparameterSpace({ 'criterion': Choice(['gini', 'entropy']), 'splitter': Choice(['best', 'random']), 'min_samples_leaf': RandInt(2, 5), 'min_samples_split': Choice([0.5, 1.0, 2, 3]), })).set_name('DecisionTreeClassifier') extra_tree_classifier = SKLearnWrapper( ExtraTreeClassifier(), HyperparameterSpace({ 'criterion': Choice(['gini', 'entropy']), 'splitter': Choice(['best', 'random']), 'min_samples_leaf': RandInt(2, 5), 'min_samples_split': Choice([0.5, 1.0, 2, 3]), })).set_name('ExtraTreeClassifier') ridge_classifier = Pipeline([OutputTransformerWrapper(NumpyRavel()), SKLearnWrapper( RidgeClassifier(), HyperparameterSpace({ 'alpha': Choice([0.0, 1.0, 10.0]), 'fit_intercept': Boolean(), 'normalize': Boolean() })) ]).set_name('RidgeClassifier') logistic_regression = Pipeline([OutputTransformerWrapper(NumpyRavel()), SKLearnWrapper( LogisticRegression(), HyperparameterSpace({ 'C': LogUniform(0.01, 10.0), 'fit_intercept': Boolean(), 'penalty': Choice(['none', 'l2']), 'max_iter': RandInt(20, 200) })) ]).set_name('LogisticRegression') random_forest_classifier = Pipeline([OutputTransformerWrapper(NumpyRavel()), SKLearnWrapper( RandomForestClassifier(), HyperparameterSpace({ 'n_estimators': RandInt(50, 600), 'criterion': Choice(['gini', 'entropy']), 'min_samples_leaf': RandInt(2, 5), 'min_samples_split': Choice([0.5, 1.0, 2, 3]), 'bootstrap': Boolean() })) ]).set_name('RandomForestClassifier') # We now have all the pieces to define a proper Neuraxle Pipeline. # In[10]: from neuraxle.steps.flow import TrainOnlyWrapper, ChooseOneStepOf from neuraxle.steps.numpy import NumpyConcatenateInnerFeatures, NumpyShapePrinter, NumpyFlattenDatum from neuraxle.union import FeatureUnion pipeline = Pipeline([ TrainOnlyWrapper(NumpyShapePrinter(custom_message="Input shape before feature union")), FeatureUnion([ Pipeline([ NumpyFFT(), NumpyAbs(), FeatureUnion([ NumpyFlattenDatum(), # Reshape from 3D to flat 2D: flattening data except on batch size FFTPeakBinWithValue() # Extract 2D features from the 3D FFT bins ], joiner=NumpyConcatenateInnerFeatures()) ]), NumpyMean(), NumpyMedian(), NumpyMin(), NumpyMax() ], joiner=NumpyConcatenateInnerFeatures()), TrainOnlyWrapper(NumpyShapePrinter(custom_message="Shape after feature union, before classification")), # Shape: [batch_size, remade_features] ChooseOneStepOf([ decision_tree_classifier, extra_tree_classifier, ridge_classifier, logistic_regression, random_forest_classifier ]), TrainOnlyWrapper(NumpyShapePrinter(custom_message="Shape at output after classification")), # Shape: [batch_size] ]) # Now that we have defined our pipeline, we can give it to an AutoML loop which will explore the hyperparameter space for us! # In[11]: import shutil # Clear cache if we've already ran the AutoML to start fresh: cache_folder = 'cache' if os.path.exists(cache_folder): shutil.rmtree(cache_folder) os.makedirs(cache_folder, exist_ok=True) # In[12]: from neuraxle.metaopt.auto_ml import AutoML, InMemoryHyperparamsRepository, ValidationSplitter, \ RandomSearchHyperparameterSelectionStrategy #from neuraxle.metaopt.tpe import TreeParzenEstimatorSelectionStrategy #from neuraxle.metaopt.auto_ml import HyperparamsJSONRepository from neuraxle.metaopt.callbacks import ScoringCallback from sklearn.metrics import accuracy_score auto_ml = AutoML( pipeline=pipeline, hyperparams_optimizer=RandomSearchHyperparameterSelectionStrategy(), validation_splitter=ValidationSplitter(test_size=0.20), scoring_callback=ScoringCallback(accuracy_score, higher_score_is_better=True), n_trials=10, epochs=1, hyperparams_repository=InMemoryHyperparamsRepository(cache_folder=cache_folder), refit_trial=True, # callbacks=[MetricCallbacks(...)] ) # In[13]: auto_ml = auto_ml.fit(X_train, y_train) # In[14]: best_pipeline = auto_ml.get_best_model() y_pred = best_pipeline.predict(X_test) accuracy = accuracy_score(y_true=y_test, y_pred=y_pred) print("Test accuracy score:", accuracy) assert accuracy > 0.9, "Try again harder!" # It's getting good on this dataset if you're over 92%. The current code is able to do this.