#!/usr/bin/env python # coding: utf-8 # In[1]: import h2o import time from h2o.estimators.glm import H2OGeneralizedLinearEstimator from h2o.estimators.gbm import H2OGradientBoostingEstimator from h2o.estimators.random_forest import H2ORandomForestEstimator from h2o.estimators.deeplearning import H2ODeepLearningEstimator # In[2]: # Explore a typical Data Science workflow with H2O and Python # # Goal: assist the manager of CitiBike of NYC to load-balance the bicycles # across the CitiBike network of stations, by predicting the number of bike # trips taken from the station every day. Use 10 million rows of historical # data, and eventually add weather data. # Connect to a cluster h2o.init() # In[3]: from h2o.utils.shared_utils import _locate # private function. used to find files within h2o git project directory. # Set this to True if you want to fetch the data directly from S3. # This is useful if your cluster is running in EC2. data_source_is_s3 = False def mylocate(s): if data_source_is_s3: return "s3n://h2o-public-test-data/" + s else: return _locate(s) # In[4]: # Pick either the big or the small demo. # Big data is 10M rows small_test = [mylocate("bigdata/laptop/citibike-nyc/2013-10.csv")] big_test = [mylocate("bigdata/laptop/citibike-nyc/2013-07.csv"), mylocate("bigdata/laptop/citibike-nyc/2013-08.csv"), mylocate("bigdata/laptop/citibike-nyc/2013-09.csv"), mylocate("bigdata/laptop/citibike-nyc/2013-10.csv"), mylocate("bigdata/laptop/citibike-nyc/2013-11.csv"), mylocate("bigdata/laptop/citibike-nyc/2013-12.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-01.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-02.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-03.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-04.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-05.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-06.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-07.csv"), mylocate("bigdata/laptop/citibike-nyc/2014-08.csv")] # ---------- # 1- Load data - 1 row per bicycle trip. Has columns showing the start and end # station, trip duration and trip start time and day. The larger dataset # totals about 10 million rows print("Import and Parse bike data") data = h2o.import_file(path=big_test) # In[5]: # ---------- # 2- light data munging: group the bike starts per-day, converting the 10M rows # of trips to about 140,000 station&day combos - predicting the number of trip # starts per-station-per-day. # Convert start time to: Day since the Epoch startime = data["starttime"] secsPerDay = 1000 * 3600 * 24 data["Days"] = (startime.asnumeric() / secsPerDay).floor() data.describe() # In[6]: # Now do a monster Group-By. Count bike starts per-station per-day. Ends up # with about 340 stations times 400 days (140,000 rows). This is what we want # to predict. grouped = data.group_by(["Days","start station name"]) bpd = grouped.count().get_frame() # Compute bikes-per-day bpd.set_name(2,"bikes") bpd.show() bpd.describe() bpd.dim # In[7]: # Quantiles: the data is fairly unbalanced; some station/day combos are wildly # more popular than others. print("Quantiles of bikes-per-day") bpd["bikes"].quantile().show() # In[8]: # A little feature engineering # Add in month-of-year (seasonality; fewer bike rides in winter than summer) secs = bpd["Days"]*secsPerDay bpd["Month"] = secs.month().asfactor() # Add in day-of-week (work-week; more bike rides on Sunday than Monday) bpd["DayOfWeek"] = secs.dayOfWeek() print("Bikes-Per-Day") bpd.describe() # In[9]: # ---------- # 3- Fit a model on train; using test as validation # Function for doing class test/train/holdout split def split_fit_predict(data): global gbm0,drf0,glm0,dl0 # Classic Test/Train split r = data['Days'].runif() # Random UNIForm numbers, one per row train = data[ r < 0.6] test = data[(0.6 <= r) & (r < 0.9)] hold = data[ 0.9 <= r ] print("Training data has",train.ncol,"columns and",train.nrow,"rows, test has",test.nrow,"rows, holdout has",hold.nrow) bike_names_x = data.names bike_names_x.remove("bikes") # Run GBM s = time.time() gbm0 = H2OGradientBoostingEstimator(ntrees=500, # 500 works well max_depth=6, learn_rate=0.1) gbm0.train(x =bike_names_x, y ="bikes", training_frame =train, validation_frame=test) gbm_elapsed = time.time() - s # Run DRF s = time.time() drf0 = H2ORandomForestEstimator(ntrees=250, max_depth=30) drf0.train(x =bike_names_x, y ="bikes", training_frame =train, validation_frame=test) drf_elapsed = time.time() - s # Run GLM if "WC1" in bike_names_x: bike_names_x.remove("WC1") s = time.time() glm0 = H2OGeneralizedLinearEstimator(Lambda=[1e-5], family="poisson") glm0.train(x =bike_names_x, y ="bikes", training_frame =train, validation_frame=test) glm_elapsed = time.time() - s # Run DL s = time.time() dl0 = H2ODeepLearningEstimator(hidden=[50,50,50,50], epochs=50) dl0.train(x =bike_names_x, y ="bikes", training_frame =train, validation_frame=test) dl_elapsed = time.time() - s # ---------- # 4- Score on holdout set & report train_mse_gbm = gbm0.model_performance(train).mse() test_mse_gbm = gbm0.model_performance(test ).mse() hold_mse_gbm = gbm0.model_performance(hold ).mse() # print "GBM mse TRAIN=",train_mse_gbm,", mse TEST=",test_mse_gbm,", mse HOLDOUT=",hold_mse_gbm train_mse_drf = drf0.model_performance(train).mse() test_mse_drf = drf0.model_performance(test ).mse() hold_mse_drf = drf0.model_performance(hold ).mse() # print "DRF mse TRAIN=",train_mse_drf,", mse TEST=",test_mse_drf,", mse HOLDOUT=",hold_mse_drf train_mse_glm = glm0.model_performance(train).mse() test_mse_glm = glm0.model_performance(test ).mse() hold_mse_glm = glm0.model_performance(hold ).mse() # print "GLM mse TRAIN=",train_mse_glm,", mse TEST=",test_mse_glm,", mse HOLDOUT=",hold_mse_glm train_mse_dl = dl0.model_performance(train).mse() test_mse_dl = dl0.model_performance(test ).mse() hold_mse_dl = dl0.model_performance(hold ).mse() # print " DL mse TRAIN=",train_mse_dl,", mse TEST=",test_mse_dl,", mse HOLDOUT=",hold_mse_dl # make a pretty HTML table printout of the results header = ["Model", "mse TRAIN", "mse TEST", "mse HOLDOUT", "Model Training Time (s)"] table = [ ["GBM", train_mse_gbm, test_mse_gbm, hold_mse_gbm, round(gbm_elapsed,3)], ["DRF", train_mse_drf, test_mse_drf, hold_mse_drf, round(drf_elapsed,3)], ["GLM", train_mse_glm, test_mse_glm, hold_mse_glm, round(glm_elapsed,3)], ["DL ", train_mse_dl, test_mse_dl, hold_mse_dl , round(dl_elapsed,3) ], ] return h2o.display.H2OTableDisplay(table, columns_labels=header) # -------------- # In[10]: # Split the data (into test & train), fit some models and predict on the holdout data split_fit_predict(bpd) # Here we see an r^2 of 0.91 for GBM, and 0.71 for GLM. This means given just # the station, the month, and the day-of-week we can predict 90% of the # variance of the bike-trip-starts. # In[11]: # ---------- # 5- Now lets add some weather # Load weather data wthr1 = h2o.import_file(path=[mylocate("bigdata/laptop/citibike-nyc/31081_New_York_City__Hourly_2013.csv"), mylocate("bigdata/laptop/citibike-nyc/31081_New_York_City__Hourly_2014.csv")]) # Peek at the data wthr1.describe() # In[12]: # Lots of columns in there! Lets plan on converting to time-since-epoch to do # a 'join' with the bike data, plus gather weather info that might affect # cyclists - rain, snow, temperature. Alas, drop the "snow" column since it's # all NA's. Also add in dew point and humidity just in case. Slice out just # the columns of interest and drop the rest. wthr2 = wthr1[["Year Local","Month Local","Day Local","Hour Local","Dew Point (C)","Humidity Fraction","Precipitation One Hour (mm)","Temperature (C)","Weather Code 1/ Description"]] wthr2.set_name(wthr2.names.index("Precipitation One Hour (mm)"), "Rain (mm)") wthr2.set_name(wthr2.names.index("Weather Code 1/ Description"), "WC1") wthr2.describe() # Much better! # In[13]: # Filter down to the weather at Noon wthr3 = wthr2[ wthr2["Hour Local"]==12 ] # In[14]: # Lets now get Days since the epoch... we'll convert year/month/day into Epoch # time, and then back to Epoch days. Need zero-based month and days, but have # 1-based. wthr3["msec"] = h2o.H2OFrame.mktime(year=wthr3["Year Local"], month=wthr3["Month Local"]-1, day=wthr3["Day Local"]-1, hour=wthr3["Hour Local"]) secsPerDay=1000*60*60*24 wthr3["Days"] = (wthr3["msec"]/secsPerDay).floor() wthr3.describe() # msec looks sane (numbers like 1.3e12 are in the correct range for msec since # 1970). Epoch Days matches closely with the epoch day numbers from the # CitiBike dataset. # In[15]: # Lets drop off the extra time columns to make a easy-to-handle dataset. wthr4 = wthr3.drop("Year Local").drop("Month Local").drop("Day Local").drop("Hour Local").drop("msec") # In[16]: # Also, most rain numbers are missing - lets assume those are zero rain days rain = wthr4["Rain (mm)"] rain[ rain.isna() ] = 0 wthr4["Rain (mm)"] = rain # In[17]: # ---------- # 6 - Join the weather data-per-day to the bike-starts-per-day print("Merge Daily Weather with Bikes-Per-Day") bpd_with_weather = bpd.merge(wthr4,all_x=True,all_y=False) bpd_with_weather.describe() bpd_with_weather.show() # In[18]: # 7 - Test/Train split again, model build again, this time with weather split_fit_predict(bpd_with_weather)