#!/usr/bin/env python # coding: utf-8 # ## Index # # - [Load workspace](#Load-workspace) # - [Create / connect to an experiment](#Create-/-connect-to-an-experiment) # - [Upload data files into datastore](#Upload-data-files-into-datastore) # - [Create training scripts](#Create-training-scripts) # - [Create / connect to a remote compute target](#Create-/-connect-to-a-remote-compute-target) # - [Create an estimator](#Create-an-estimator) # - [Submit the job to the cluster & Run](#Submit-the-job-to-the-cluster-&-Run) # - [Register model](#Register-model) # - [Clean up the compute target](#Clean-up-the-compute-target) # In[1]: get_ipython().run_line_magic('matplotlib', 'inline') import numpy as np import pandas as pd import matplotlib import matplotlib.pyplot as plt import azureml from azureml.core import Workspace, Run print("Azure ML SDK Version: ", azureml.core.VERSION) # ## Load workspace # In[2]: # load workspace configuration from the config.json file in the current folder. ws = Workspace.from_config() print(ws.name, ws.location, ws.resource_group, ws.location, sep = '\t') # ## Create / connect to an experiment # In[3]: # create an experiment experiment_name = 'nyc-taxi-batch-AI' from azureml.core import Experiment exp = Experiment(workspace = ws, name = experiment_name) # ## Upload data files into datastore # # Every workspace comes with a default datastore which is backed by the Azure blob storage account associated with the workspace. We can use it to transfer data from local to the cloud, and access it from the compute target. # In[4]: ds = ws.get_default_datastore() print(ds.datastore_type, ds.account_name, ds.container_name) # In[5]: ds.upload_files(['./data_after_prep.pkl'], target_path='nyc-taxi', overwrite=True, show_progress=True) #ds.upload(src_dir='.', target_path='nyc-taxi', overwrite=True, show_progress=True) # ## Create training scripts # # ### Create a script directory # # Create a directory to deliver the necessary code from local to the remote target. # In[6]: import os script_folder = './scripts_batch_AI' os.makedirs(script_folder, exist_ok=True) # ### Create scripts # # To submit the job to the cluster, we should create a training script. # # _**Note**: The data path settings of DSVM and Batch AI cluster are different. Be careful !!!_ # In[7]: get_ipython().run_cell_magic('writefile', '$script_folder/train.py', '\nimport os\nimport argparse\nimport numpy as np\nimport pandas as pd\n\nfrom sklearn import preprocessing\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.linear_model import LinearRegression\nfrom sklearn.metrics import mean_squared_error\nfrom sklearn.externals import joblib\n\nfrom azureml.core import Run\n\n# get hold of the current run\nrun = Run.get_submitted_run()\n\nparser = argparse.ArgumentParser()\nparser.add_argument(\'--data-folder\', type=str, dest=\'data_folder\', help=\'data folder mounting point\')\nargs = parser.parse_args()\n\ndata_folder = os.path.join(args.data_folder, \'nyc-taxi\')\nrun.log(\'Data folder\', data_folder)\n\ndata_path = os.path.join(data_folder, \'data_after_prep.pkl\')\nrun.log(\'Data path\', data_path)\n\n# load train and test set into numpy arrays\npd_dataframe = pd.read_pickle(data_path)\nrun.log(\'Data loading\', \'finished\')\n\n# data processing\nle = preprocessing.LabelEncoder()\nle.fit(["N", "Y"])\npd_dataframe["store_and_fwd_flag"] = le.transform(pd_dataframe["store_and_fwd_flag"])\n\nle.fit(["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"])\npd_dataframe["pickup_weekday"] = le.transform(pd_dataframe["pickup_weekday"])\npd_dataframe["dropoff_weekday"] = le.transform(pd_dataframe["dropoff_weekday"])\nrun.log(\'Data processing\', \'finished\')\n\n# load dataset into numpy arrays\ny = np.array(pd_dataframe["trip_duration"]).astype(float)\ny = np.log(y)\nX = np.array(pd_dataframe.drop(["trip_duration"],axis = 1))\n\n# normalize data\nscaler = preprocessing.StandardScaler().fit(X)\nX = scaler.transform(X)\nrun.log(\'Normalization\', \'finished\')\n\n# split data into train and validation datasets\nX_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.25, random_state = 20)\n\n# train LR model\nlm = LinearRegression()\nlm.fit(X_train, y_train)\nrun.log(\'Model training\', \'finished\')\n\ny_pred = lm.predict(X_val)\nrun.log(\'Prediction\', \'finished\')\n\n# evaluation\nmse = mean_squared_error(y_val, y_pred)\nrun.log(\'Evaluation\', \'finished\')\nrun.log(\'Mean Squared Error\', np.float(mse))\n\nos.makedirs(\'outputs\', exist_ok=True)\n# note!!! file saved in the outputs folder is automatically uploaded into experiment record\njoblib.dump(value=lm, filename=\'outputs/nyc_taxi_model_cluster.pkl\')\n') # ## Create / connect to a remote compute target # In[8]: from azureml.core.compute import ComputeTarget, BatchAiCompute from azureml.core.compute_target import ComputeTargetException # choose a name for your cluster batchai_cluster_name = "traincluster" try: # look for the existing cluster by name compute_target = ComputeTarget(workspace=ws, name=batchai_cluster_name) if type(compute_target) is BatchAiCompute: print('found compute target {}, just use it.'.format(batchai_cluster_name)) else: print('{} exists but it is not a Batch AI cluster. Please choose a different name.'.format(batchai_cluster_name)) except ComputeTargetException: print('creating a new compute target...') compute_config = BatchAiCompute.provisioning_configuration(vm_size="STANDARD_D2_V2", # small CPU-based VM #vm_priority='lowpriority', # optional autoscale_enabled=True, cluster_min_nodes=0, cluster_max_nodes=4) # create the cluster compute_target = ComputeTarget.create(ws, batchai_cluster_name, compute_config) # can poll for a minimum number of nodes and for a specific timeout. # if no min node count is provided it uses the scale settings for the cluster compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20) # Use the 'status' property to get a detailed status for the current cluster. print(compute_target.status.serialize()) # ## Create an estimator # # An estimator object is used to submit the run. # In[9]: from azureml.train.estimator import Estimator script_params = { '--data-folder': ds.as_mount() } est = Estimator(source_directory=script_folder, script_params=script_params, compute_target=compute_target, entry_script='train.py', conda_packages=['numpy','pandas','scikit-learn']) # ## Submit the job to the cluster & Run # # Run the experiment by submitting the estimator object. # In[10]: run = exp.submit(config = est) run # Show running details. # In[11]: from azureml.train.widgets import RunDetails RunDetails(run).show() # ### Display run results # In[12]: print(run.get_metrics()) # ## Register model # # `outputs` is a special directory in that all content in this directory is automatically uploaded to your workspace. Hence, the model file will also available in the workspace. # # We can see files associated with that run with the following line. # In[13]: print(run.get_file_names()) # Register the model in the workspace so that you (or other collaborators) can later query, examine, and deploy this model. # In[14]: # register model model = run.register_model(model_name='nyc_taxi_model_cluster', model_path='outputs/nyc_taxi_model_cluster.pkl') print(model.name, model.id, model.version, sep = '\t') # ## Clean up the compute target # In[47]: # optionally, delete the Azure Managed Compute cluster compute_target.delete()