IPython notebook
Easier to just run the following commands at a terminal
%%bash
# Miniconda allows to download binary instead of compiling like pip
# get Miniconda from http://conda.pydata.org/miniconda.html
wget http://repo.continuum.io/miniconda/Miniconda-3.0.5-Linux-x86_64.sh
bash Miniconda-3.0.5-Linux-x86_64.sh
conda create -n pysc --yes ipython pyzmq tornado jinja2 pandas pygments pip pycrypto
source activate pysc
pip install --upgrade starcluster
Then open the ipython notebook within that conda environment, i.e. activate and run "ipython notebook
"
from ConfigParser import ConfigParser
config = ConfigParser()
credentials.csv
import pandas as pd
# extract first row of csv
credentials = pd.read_csv("credentials.csv").ix[0]
config.add_section("aws info")
config.set("aws info", "aws_access_key_id", credentials["Access Key Id"])
config.set("aws info", "aws_secret_access_key", credentials["Secret Access Key"])
starcluster
starcluster.pem
in your working folderchmod 400 starcluster.pem
# key pairs are region-specific
config.set("aws info", "aws_region_name", "us-west-2")
config.set("aws info", "aws_region_host", "ec2.us-west-2.amazonaws.com")
config.add_section("keypair starcluster")
config.set("keypair starcluster", "key_location", "starcluster.pem")
import os
import os.path
def write_sc_conf(sc_conf):
"""Write starcluster configuration to ~/.starcluster/config"""
folder = os.path.join(os.path.expanduser("~"), ".starcluster")
try:
os.makedirs(folder)
except:
pass
with open(os.path.join(folder, "config"), "w") as f:
config.write(f)
write_sc_conf(config)
%%bash
starcluster listpublic | grep 64
64bit Images: [0] ami-04bedf34 us-west-2 starcluster-base-ubuntu-13.04-x86_64 (EBS) [1] ami-80bedfb0 us-west-2 starcluster-base-ubuntu-13.04-x86_64-hvm (HVM-EBS) [2] ami-486afe78 us-west-2 starcluster-base-ubuntu-12.04-x86_64-hvm (HVM-EBS) [3] ami-706afe40 us-west-2 starcluster-base-ubuntu-12.04-x86_64 (EBS) [4] ami-c6bd30f6 us-west-2 starcluster-base-ubuntu-11.10-x86_64 (EBS)
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.3) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu
sec = "cluster pyec2"
config.add_section(sec)
config.set(sec, "keyname", "starcluster")
config.set(sec, "cluster_size", 1)
config.set(sec, "cluster_user", "ipuser")
config.set(sec, "disable_queue", True)
ami = "ami-706afe40"
instance = "t1.micro"
for name in ["master", "node"]:
config.set(sec, name + "_image_id", ami)
config.set(sec, name + "_instance_type", instance)
config.add_section("global")
config.set("global", "default_template", "pyec2")
write_sc_conf(config)
%%bash
starcluster createvolume -n ebs1gbwest2a -i ami-fa9cf1ca --detach-volume 1 us-west-2a
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.2)
Software Tools for Academics and Researchers (STAR)
Please submit bug reports to starcluster@mit.edu
>>> No keypair specified, picking one from config...
>>> Using keypair: starcluster
>>> Creating security group @sc-volumecreator...
>>> No instance in group @sc-volumecreator for zone us-west-2a, launching one now.
Reservation:r-eb9f81e2
>>> Waiting for volume host to come up... (updating every 30s)
>>> Waiting for all nodes to be in a 'running' state...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Waiting for SSH to come up on all nodes...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Waiting for cluster to come up took 0.606 mins
>>> Checking for required remote commands...
>>> Creating 1GB volume in zone us-west-2a
>>> New volume id: vol-53829851
>>> Waiting for vol-53829851 to become 'available'...
>>> Attaching volume vol-53829851 to instance i-635cbd6b...
>>> Waiting for vol-53829851 to transition to: attached...
>>> Formatting volume...
Filesystem label=
OS type: Linux
Block size=4096 (log=2)
Fragment size=4096 (log=2)
Stride=0 blocks, Stripe width=0 blocks
65536 inodes, 262144 blocks
13107 blocks (5.00%) reserved for the super user
First data block=0
Maximum filesystem blocks=268435456
8 block groups
32768 blocks per group, 32768 fragments per group
8192 inodes per group
Superblock backups stored on blocks:
32768, 98304, 163840, 229376
Allocating group tables: done
Writing inode tables: done
Creating journal (8192 blocks): done
Writing superblocks and filesystem accounting information: done
mke2fs 1.42 (29-Nov-2011)
>>> Detaching volume vol-53829851 from instance i-635cbd6b
>>> Not terminating host instance i-635cbd6b
>>> Your new 1GB volume vol-53829851 has been created successfully
*** WARNING - There are still volume hosts running: i-635cbd6b
>>> Creating volume took 0.947 mins
config.add_section("volume data")
# this is the Amazon EBS volume id
config.set("volume data", "volume_id", "vol-53829851")
# the path to mount this EBS volume on
# (this path will also be nfs shared to all nodes in the cluster)
config.set("volume data", "mount_path", "/data")
config.set("cluster pyec2", "volumes", "data")
write_sc_conf(config)
sec = "plugin ipcluster"
config.add_section(sec)
config.set(sec, "setup_class", "starcluster.plugins.ipcluster.IPCluster")
config.set(sec, "enable_notebook", True)
# set a password for the notebook for increased security
config.set(sec, "notebook_passwd", "mysupersecretpassword")
# store notebooks on EBS!
config.set(sec, "notebook_directory", "/data")
# pickle is faster for communication than the default JSON
config.set(sec, "packer", "pickle")
config.add_section("plugin pypackages")
config.set("plugin pypackages", "setup_class", "starcluster.plugins.pypkginstaller.PyPkgInstaller")
config.set("plugin pypackages", "packages", "scikit-learn, psutil")
config.set("cluster pyec2", "plugins", "pypackages, ipcluster")
write_sc_conf(config)
%%bash
starcluster start -s 1 pyec2
>>> Using default cluster template: pyec2 >>> Validating cluster template settings... >>> Cluster template settings are valid >>> Starting cluster... >>> Launching a 1-node cluster... >>> Creating security group @sc-pyec2... >>> Waiting for security group @sc-pyec2... | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / Reservation:r-02948d0b >>> Waiting for cluster to come up... (updating every 30s) >>> Waiting for all nodes to be in a 'running' state... >>> Waiting for SSH to come up on all nodes... >>> Waiting for cluster to come up took 1.057 mins >>> The master node is ec2-54-186-36-53.us-west-2.compute.amazonaws.com >>> Configuring cluster... >>> Attaching volume vol-53829851 to master node on /dev/sdz ... >>> Waiting for vol-53829851 to transition to: attached... | / - \ - | >>> Running plugin starcluster.clustersetup.DefaultClusterSetup >>> Configuring hostnames... >>> Mounting EBS volume vol-53829851 on /data... >>> Creating cluster user: ipuser (uid: 1001, gid: 1001) >>> Configuring scratch space for user(s): ipuser >>> Configuring /etc/hosts on each node >>> Starting NFS server on master >>> Setting up NFS took 0.031 mins >>> Configuring passwordless ssh for root >>> Configuring passwordless ssh for ipuser >>> Running plugin pypackages >>> Installing Python packages on all nodes: >>> $ pip install scikit-learn >>> $ pip install psutil >>> PyPkgInstaller took 2.622 mins >>> Running plugin ipcluster >>> Writing IPython cluster config files >>> Starting the IPython controller and 1 engines on master >>> Waiting for JSON connector file... | / >>> Creating IPCluster cache directory: /home/zonca/.starcluster/ipcluster >>> Authorizing tcp ports [1000-65535] on 0.0.0.0/0 for: IPython controller >>> Setting up IPython web notebook for user: ipuser >>> Creating SSL certificate for user ipuser >>> Authorizing tcp ports [8888-8888] on 0.0.0.0/0 for: notebook >>> IPython notebook URL: https://ec2-54-186-36-53.us-west-2.compute.amazonaws.com:8888 >>> The notebook password is: mysupersecretpassword *** WARNING - Please check your local firewall settings if you're having *** WARNING - issues connecting to the IPython notebook >>> IPCluster has been started on SecurityGroup:@sc-pyec2 for user 'ipuser' with 1 engines on 1 nodes. To connect to cluster from your local machine use: from IPython.parallel import Client client = Client('/home/zonca/.starcluster/ipcluster/SecurityGroup:@sc-pyec2-us-west-2.json', sshkey='starcluster.pem') See the IPCluster plugin doc for usage details: http://star.mit.edu/cluster/docs/latest/plugins/ipython.html >>> IPCluster took 0.364 mins >>> Configuring cluster took 3.475 mins >>> Starting cluster took 7.204 mins The cluster is now ready to use. To login to the master node as root, run: $ starcluster sshmaster pyec2 If you're having issues with the cluster you can reboot the instances and completely reconfigure the cluster from scratch using: $ starcluster restart pyec2 When you're finished using the cluster and wish to terminate it and stop paying for service: $ starcluster terminate pyec2 Alternatively, if the cluster uses EBS instances, you can use the 'stop' command to shutdown all nodes and put them into a 'stopped' state preserving the EBS volumes backing the nodes: $ starcluster stop pyec2 WARNING: Any data stored in ephemeral storage (usually /mnt) will be lost! You can activate a 'stopped' cluster by passing the -x option to the 'start' command: $ starcluster start -x pyec2 This will start all 'stopped' nodes and reconfigure the cluster.
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.3) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% /home/zonca/.starcluster/ipcluster/SecurityGroup:@sc-pyec2-us-west-2.json 100% || Time: 00:00:00 2.96 K/s
%%bash
starcluster sshmaster -A pyec2 # -A use local keys remotely
_ _ _ __/\_____| |_ __ _ _ __ ___| |_ _ ___| |_ ___ _ __ \ / __| __/ _` | '__/ __| | | | / __| __/ _ \ '__| /_ _\__ \ || (_| | | | (__| | |_| \__ \ || __/ | \/ |___/\__\__,_|_| \___|_|\__,_|___/\__\___|_| StarCluster Ubuntu 12.04 AMI Software Tools for Academics and Researchers (STAR) Homepage: http://star.mit.edu/cluster Documentation: http://star.mit.edu/cluster/docs/latest Code: https://github.com/jtriley/StarCluster Mailing list: starcluster@mit.edu This AMI Contains: * Open Grid Scheduler (OGS - formerly SGE) queuing system * Condor workload management system * OpenMPI compiled with Open Grid Scheduler support * OpenBLAS- Highly optimized Basic Linear Algebra Routines * NumPy/SciPy linked against OpenBlas * IPython 0.13 with parallel support * and more! (use 'dpkg -l' to show all installed packages) Open Grid Scheduler/Condor cheat sheet: * qstat/condor_q - show status of batch jobs * qhost/condor_status- show status of hosts, queues, and jobs * qsub/condor_submit - submit batch jobs (e.g. qsub -cwd ./job.sh) * qdel/condor_rm - delete batch jobs (e.g. qdel 7) * qconf - configure Open Grid Scheduler system Current System Stats: System load: 0.06 Processes: 93 Usage of /: 27.8% of 9.84GB Users logged in: 1 Memory usage: 60% IP address for eth0: 172.31.17.235 Swap usage: 0%
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.3) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu Pseudo-terminal will not be allocated because stdin is not a terminal. stdin: is not a tty
%%bash
starcluster terminate -c pyec2 # -c does not prompt for confirm
>>> Running plugin starcluster.plugins.ipcluster.IPCluster >>> Running plugin starcluster.plugins.pypkginstaller.PyPkgInstaller >>> Running plugin starcluster.clustersetup.DefaultClusterSetup >>> Detaching volume vol-53829851 from master >>> Terminating node: master (i-1d9a7915) >>> Waiting for cluster to terminate... | >>> Removing security group: @sc-pyec2 | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ - | / - \ -
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.3) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu
See example notebook on face recognition run on a t1.micro instance (half GB RAM)
rsync
data from local drive to /data
Supported out-of-the-box by scikit-learn
:
sklearn.cluster.KMeans(n_jobs=-1)
sklearn.ensemble.RandomForestClassifier(n_jobs=-1)
Just set n_jobs
to the number of processes or -1 for automatically set it to the number of cores.
Multi-core support provided by joblib
: https://pythonhosted.org/joblib/parallel.html
%%bash
starcluster spothistory c3.2xlarge
>>> Fetching spot history for c3.2xlarge (VPC) >>> Current price: $0.1281 >>> Max price: $2.4000 >>> Average price: $0.3339
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.2) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu
On-demand price was 0.6/h, so saving about a factor of 4.
Current largest instance: c3.8xlarge, at 2.4/h , was 0.5/h this morning!
%%bash
starcluster start -s 1 --force-spot-master -b 0.5 -I c3.2xlarge singlenode
Show local example to understand ipcontroller
, ipengines
, client
%%bash
starcluster start -c pyec2 -s 5 -b 0.5 -I c3.2xlarge fivenodescluster
def compute_evaluation(filename, model, params):
"""Function executed by a worker to evaluate a model"""
# All module imports should be executed in the worker namespace
from sklearn.externals import joblib
X_train, y_train, X_validation, y_validation = joblib.load(
filename, mmap_mode='c')
model.set_params(**params)
model.fit(X_train, y_train)
validation_score = model.score(X_validation, y_validation)
return validation_score
import numpy as np
svc_params = {
'C': np.logspace(-1, 2, 4),
'gamma': np.logspace(-4, 0, 5),
}
from sklearn.grid_search import ParameterGrid
list(ParameterGrid(svc_params))
[{'C': 0.10000000000000001, 'gamma': 0.0001}, {'C': 0.10000000000000001, 'gamma': 0.001}, {'C': 0.10000000000000001, 'gamma': 0.01}, {'C': 0.10000000000000001, 'gamma': 0.10000000000000001}, {'C': 0.10000000000000001, 'gamma': 1.0}, {'C': 1.0, 'gamma': 0.0001}, {'C': 1.0, 'gamma': 0.001}, {'C': 1.0, 'gamma': 0.01}, {'C': 1.0, 'gamma': 0.10000000000000001}, {'C': 1.0, 'gamma': 1.0}, {'C': 10.0, 'gamma': 0.0001}, {'C': 10.0, 'gamma': 0.001}, {'C': 10.0, 'gamma': 0.01}, {'C': 10.0, 'gamma': 0.10000000000000001}, {'C': 10.0, 'gamma': 1.0}, {'C': 100.0, 'gamma': 0.0001}, {'C': 100.0, 'gamma': 0.001}, {'C': 100.0, 'gamma': 0.01}, {'C': 100.0, 'gamma': 0.10000000000000001}, {'C': 100.0, 'gamma': 1.0}]
len(list(ParameterGrid(svc_params)))
20
def compute_evaluation(filename, model, params):
"""Function executed by a worker to evaluate a model"""
# All module imports should be executed in the worker namespace
from sklearn.externals import joblib
X_train, y_train, X_validation, y_validation = joblib.load(
filename, mmap_mode='c')
model.set_params(**params)
model.fit(X_train, y_train)
validation_score = model.score(X_validation, y_validation)
return validation_score
from IPython.parallel import Client
rc = Client()
# create the balanced view object
lview = rc.load_balanced_view()
tasks = []
for each in svc_params:
tasks.append(lview.apply_async(compute_evaluation, "data/input.pkl", model_1, each))
def progress(tasks):
return np.mean([task.ready() for task in tasks])
print("Tasks completed: {0}%".format(100 * progress(tasks)))
def find_best(tasks, n_top=5):
"""Compute the mean score of the completed tasks"""
scores = [t.get() for t in tasks if t.ready()]
return sorted(scores, reverse=True)[:n_top]
print("Tasks completed: {0}%".format(100 * progress(tasks)))
find_best(tasks)
EBS
over NFS
does not scale, so large data and more than few tens of engines better use S3.StarCluster
sets number of engines equal to number of cores, we can modify to have less -> more memorymemorymap
to share across enginesmodel_selection.RandomizedGridSeach
: https://github.com/ogrisel/parallel_ml_tutorial/sdsc.edu
: zonca
svc_params = {
'C': np.logspace(-1, 2, 4),
'gamma': np.logspace(-4, 0, 5),
}
from sklearn.externals import joblib
from sklearn.cross_validation import ShuffleSplit
import os
def persist_cv_splits(X, y, n_cv_iter=5, name='data',
suffix="_cv_%03d.pkl", test_size=0.25, random_state=None):
"""Materialize randomized train test splits of a dataset."""
cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter,
test_size=test_size, random_state=random_state)
cv_split_filenames = []
for i, (train, test) in enumerate(cv):
cv_fold = (X[train], y[train], X[test], y[test])
cv_split_filename = name + suffix % i
cv_split_filename = os.path.abspath(cv_split_filename)
joblib.dump(cv_fold, cv_split_filename)
cv_split_filenames.append(cv_split_filename)
return cv_split_filenames
from sklearn.datasets import load_digits
digits = load_digits()
digits_split_filenames = persist_cv_splits(digits.data, digits.target,
name='digits', random_state=42)
def compute_evaluation(cv_split_filename, model, params):
"""Function executed by a worker to evaluate a model on a CV split"""
# All module imports should be executed in the worker namespace
from sklearn.externals import joblib
X_train, y_train, X_validation, y_validation = joblib.load(
cv_split_filename, mmap_mode='c')
model.set_params(**params)
model.fit(X_train, y_train)
validation_score = model.score(X_validation, y_validation)
return validation_score
def grid_search(lb_view, model, cv_split_filenames, param_grid):
"""Launch all grid search evaluation tasks."""
all_tasks = []
all_parameters = list(ParameterGrid(param_grid))
for i, params in enumerate(all_parameters):
task_for_params = []
for j, cv_split_filename in enumerate(cv_split_filenames):
t = lb_view.apply(
compute_evaluation, cv_split_filename, model, params)
task_for_params.append(t)
all_tasks.append(task_for_params)
return all_parameters, all_tasks
from sklearn.svm import SVC
from IPython.parallel import Client
client = Client()
lb_view = client.load_balanced_view()
model = SVC()
svc_params = {
'C': np.logspace(-1, 2, 4),
'gamma': np.logspace(-4, 0, 5),
}
all_parameters, all_tasks = grid_search(
lb_view, model, digits_split_filenames, svc_params)
def find_best(all_parameters, all_tasks, n_top=5):
"""Compute the mean score of the completed tasks"""
mean_scores = []
for param, task_group in zip(all_parameters, all_tasks):
scores = [t.get() for t in task_group if t.ready()]
if len(scores) == 0:
continue
mean_scores.append((np.mean(scores), param))
return sorted(mean_scores, reverse=True)[:n_top]
find_best(all_parameters, all_tasks)