#!/usr/bin/env python # coding: utf-8 # In[1]: import os, sys, glob, datetime # specify spark version, python version spark_home = "/home/zero/spark-2.4.0-bin-hadoop2.7" # MODIFY THIS python_path="/apps/anaconda3/bin/python" # set environment variables os.environ['SPARK_HOME'] = spark_home os.environ['PYSPARK_PYTHON'] = python_path os.environ['SPARK_LOCAL_IP'] = "127.0.0.1" def setup_spark_env(app_name): # set environment variables spark_python = os.path.join(spark_home, 'python') py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0] sys.path[:0] = [spark_python, py4j] # specify Spark application parameters PYSPARK_SUBMIT_ARGS="--master local[2]" os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS + " --name '%s_%s'"%(app_name, datetime.datetime.now().strftime("%Y%m%d %H:%M")) + " pyspark-shell") return # setup_spark_env("your_spark_process_name") # MODIFY THIS # launching PySpark application # execfile(os.path.join(spark_home, 'python/pyspark/shell.py')) filename=os.path.join(spark_home, 'python/pyspark/shell.py') exec(compile(open(filename, "rb").read(), filename, 'exec')) sc.setLogLevel('ERROR') print("{}".format(sc.applicationId)) # In[2]: from pyspark.sql import functions as sf from pyspark.sql import Row from pyspark.sql.types import * import numpy as np # In[3]: import os, math, subprocess import pandas as pd import numpy as np import matplotlib.pyplot as plt # some settings for displaying Pandas results pd.set_option('display.width', 2000) pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.precision', 4) pd.set_option('display.max_colwidth', -1) # In[4]: # load data data_path = "home-credit-default-risk/application_train.csv" df = sqlContext.read.format("csv").option("header", "true").load(data_path) print(df.take(1)) # In[5]: total_records = df.count() print("total_records:", total_records) # In[6]: # check dtypes for n, t in df.dtypes: print("{} ({})".format(n, t)) # In[7]: # count distinct for cname in df.columns: cnt_dist = df.select(cname).distinct().count() pct_dist = cnt_dist * 100.0 / total_records print("{}: {} ({:0.2f}%)".format(cname, cnt_dist, pct_dist)) # In[8]: # count NULL for cname in df.columns: cnt_null = df.where("{} is NULL".format(cname)).count() pct_miss = cnt_null * 100.0 / total_records print("{}: {} ({:0.2f}%)".format(cname, cnt_null, pct_miss)) # In[9]: # count zeros for cname in df.columns: cnt_zeros = df.where("{} = 0.0".format(cname)).count() pct_zeros = cnt_zeros * 100.0 / total_records print("{}: {} ({:0.2f}%)".format(cname, cnt_zeros, pct_zeros)) # In[10]: # count negative for cname in df.columns: cnt_neg = df.where("{} < 0".format(cname)).count() pct_neg = cnt_neg * 100.0 / total_records print("{}: {} ({:0.2f}%)".format(cname, cnt_neg, pct_neg)) # In[11]: # stats for number ls_features = [cname for cname in df.columns if cname != "SK_ID_CURR"] pdf_stats = df.select(ls_features).describe().toPandas() pdf_stats.T