%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import azureml
from azureml.core import Workspace, Run
print("Azure ML SDK Version: ", azureml.core.VERSION)
Azure ML SDK Version: 0.1.59
# load workspace configuration from the config.json file in the current folder.
ws = Workspace.from_config()
print(ws.name, ws.location, ws.resource_group, ws.location, sep = '\t')
Found the config file in: /home/nbuser/library/aml_config/config.json Xiangzhe-WS westeurope Xiangzhe-ML westeurope
# create an experiment
experiment_name = 'nyc-taxi-batch-AI'
from azureml.core import Experiment
exp = Experiment(workspace = ws, name = experiment_name)
Every workspace comes with a default datastore which is backed by the Azure blob storage account associated with the workspace. We can use it to transfer data from local to the cloud, and access it from the compute target.
ds = ws.get_default_datastore()
print(ds.datastore_type, ds.account_name, ds.container_name)
AzureFile xiangzhews1068013949 azureml-filestore-bc063c69-64a6-48ce-90f5-33cb3c8d43b2
ds.upload_files(['./data_after_prep.pkl'], target_path='nyc-taxi', overwrite=True, show_progress=True)
#ds.upload(src_dir='.', target_path='nyc-taxi', overwrite=True, show_progress=True)
$AZUREML_DATAREFERENCE_5fe09e9af7b443fa8d09db5c1b02df2c
import os
script_folder = './scripts_batch_AI'
os.makedirs(script_folder, exist_ok=True)
To submit the job to the cluster, we should create a training script.
Note: The data path settings of DSVM and Batch AI cluster are different. Be careful !!!
%%writefile $script_folder/train.py
import os
import argparse
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.externals import joblib
from azureml.core import Run
# get hold of the current run
run = Run.get_submitted_run()
parser = argparse.ArgumentParser()
parser.add_argument('--data-folder', type=str, dest='data_folder', help='data folder mounting point')
args = parser.parse_args()
data_folder = os.path.join(args.data_folder, 'nyc-taxi')
run.log('Data folder', data_folder)
data_path = os.path.join(data_folder, 'data_after_prep.pkl')
run.log('Data path', data_path)
# load train and test set into numpy arrays
pd_dataframe = pd.read_pickle(data_path)
run.log('Data loading', 'finished')
# data processing
le = preprocessing.LabelEncoder()
le.fit(["N", "Y"])
pd_dataframe["store_and_fwd_flag"] = le.transform(pd_dataframe["store_and_fwd_flag"])
le.fit(["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"])
pd_dataframe["pickup_weekday"] = le.transform(pd_dataframe["pickup_weekday"])
pd_dataframe["dropoff_weekday"] = le.transform(pd_dataframe["dropoff_weekday"])
run.log('Data processing', 'finished')
# load dataset into numpy arrays
y = np.array(pd_dataframe["trip_duration"]).astype(float)
y = np.log(y)
X = np.array(pd_dataframe.drop(["trip_duration"],axis = 1))
# normalize data
scaler = preprocessing.StandardScaler().fit(X)
X = scaler.transform(X)
run.log('Normalization', 'finished')
# split data into train and validation datasets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.25, random_state = 20)
# train LR model
lm = LinearRegression()
lm.fit(X_train, y_train)
run.log('Model training', 'finished')
y_pred = lm.predict(X_val)
run.log('Prediction', 'finished')
# evaluation
mse = mean_squared_error(y_val, y_pred)
run.log('Evaluation', 'finished')
run.log('Mean Squared Error', np.float(mse))
os.makedirs('outputs', exist_ok=True)
# note!!! file saved in the outputs folder is automatically uploaded into experiment record
joblib.dump(value=lm, filename='outputs/nyc_taxi_model_cluster.pkl')
Writing ./scripts_batch_AI/train.py
from azureml.core.compute import ComputeTarget, BatchAiCompute
from azureml.core.compute_target import ComputeTargetException
# choose a name for your cluster
batchai_cluster_name = "traincluster"
try:
# look for the existing cluster by name
compute_target = ComputeTarget(workspace=ws, name=batchai_cluster_name)
if type(compute_target) is BatchAiCompute:
print('found compute target {}, just use it.'.format(batchai_cluster_name))
else:
print('{} exists but it is not a Batch AI cluster. Please choose a different name.'.format(batchai_cluster_name))
except ComputeTargetException:
print('creating a new compute target...')
compute_config = BatchAiCompute.provisioning_configuration(vm_size="STANDARD_D2_V2", # small CPU-based VM
#vm_priority='lowpriority', # optional
autoscale_enabled=True,
cluster_min_nodes=0,
cluster_max_nodes=4)
# create the cluster
compute_target = ComputeTarget.create(ws, batchai_cluster_name, compute_config)
# can poll for a minimum number of nodes and for a specific timeout.
# if no min node count is provided it uses the scale settings for the cluster
compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
# Use the 'status' property to get a detailed status for the current cluster.
print(compute_target.status.serialize())
found compute target traincluster, just use it.
An estimator object is used to submit the run.
from azureml.train.estimator import Estimator
script_params = {
'--data-folder': ds.as_mount()
}
est = Estimator(source_directory=script_folder,
script_params=script_params,
compute_target=compute_target,
entry_script='train.py',
conda_packages=['numpy','pandas','scikit-learn'])
Run the experiment by submitting the estimator object.
run = exp.submit(config = est)
run
Experiment | Id | Type | Status | Details Page | Docs Page |
---|---|---|---|---|---|
nyc-taxi-batch-AI | nyc-taxi-batch-AI_1538578635582 | azureml.scriptrun | Running | Link to Azure Portal | Link to Documentation |
Show running details.
from azureml.train.widgets import RunDetails
RunDetails(run).show()
_UserRun()
MSI: Failed to retrieve a token from 'http://localhost:25198/nb/api/nbsvc/oauth2/token' with an error of 'HTTPConnectionPool(host='localhost', port=25198): Max retries exceeded with url: /nb/api/nbsvc/oauth2/token (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f16d330dd68>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))'. This could be caused by the MSI extension not yet fullly provisioned. MSI: Failed to retrieve a token from 'http://localhost:25198/nb/api/nbsvc/oauth2/token' with an error of 'HTTPConnectionPool(host='localhost', port=25198): Max retries exceeded with url: /nb/api/nbsvc/oauth2/token (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f16d330d630>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))'. This could be caused by the MSI extension not yet fullly provisioned.
print(run.get_metrics())
{'Data folder': '/mnt/batch/tasks/shared/LS_root/jobs/traincluster8575340143/azureml/nyc-taxi-batch-ai_1538578635582/mounts/workspacefilestore/nyc-taxi', 'Data path': '/mnt/batch/tasks/shared/LS_root/jobs/traincluster8575340143/azureml/nyc-taxi-batch-ai_1538578635582/mounts/workspacefilestore/nyc-taxi/data_after_prep.pkl', 'Data loading': 'finished', 'Data processing': 'finished', 'Normalization': 'finished', 'Model training': 'finished', 'Prediction': 'finished', 'Evaluation': 'finished', 'Mean Squared Error': 0.3878969301600042}
outputs
is a special directory in that all content in this directory is automatically uploaded to your workspace. Hence, the model file will also available in the workspace.
We can see files associated with that run with the following line.
print(run.get_file_names())
['azureml-logs/60_control_log.txt', 'azureml-logs/80_driver_log.txt', 'outputs/nyc_taxi_model_cluster.pkl', 'driver_log', 'azureml-logs/azureml.log', 'azureml-logs/55_batchai_execution.txt']
Register the model in the workspace so that you (or other collaborators) can later query, examine, and deploy this model.
# register model
model = run.register_model(model_name='nyc_taxi_model_cluster', model_path='outputs/nyc_taxi_model_cluster.pkl')
print(model.name, model.id, model.version, sep = '\t')
nyc_taxi_model_cluster nyc_taxi_model_cluster:1 1
# optionally, delete the Azure Managed Compute cluster
compute_target.delete()