#!/usr/bin/env python # coding: utf-8 # In[1]: import os, sys, glob, datetime # specify spark version, python version spark_home = "/home/zero/spark-2.4.0-bin-hadoop2.7" # MODIFY THIS python_path="/apps/anaconda3/bin/python" # set environment variables os.environ['SPARK_HOME'] = spark_home os.environ['PYSPARK_PYTHON'] = python_path os.environ['SPARK_LOCAL_IP'] = "127.0.0.1" def setup_spark_env(app_name): # set environment variables spark_python = os.path.join(spark_home, 'python') py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0] sys.path[:0] = [spark_python, py4j] # specify Spark application parameters PYSPARK_SUBMIT_ARGS="--master local[2]" os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS + " --name '%s_%s'"%(app_name, datetime.datetime.now().strftime("%Y%m%d %H:%M")) + " pyspark-shell") return # setup_spark_env("your_spark_process_name") # MODIFY THIS # launching PySpark application # execfile(os.path.join(spark_home, 'python/pyspark/shell.py')) filename=os.path.join(spark_home, 'python/pyspark/shell.py') exec(compile(open(filename, "rb").read(), filename, 'exec')) sc.setLogLevel('ERROR') print("{}".format(sc.applicationId)) # In[2]: from pyspark.sql import functions as sf from pyspark.sql import Row from pyspark.sql.types import * import numpy as np # In[3]: import os, math, subprocess import pandas as pd import numpy as np import matplotlib.pyplot as plt # some settings for displaying Pandas results pd.set_option('display.width', 2000) pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.precision', 4) pd.set_option('display.max_colwidth', -1) # In[4]: # load data data_path = "home-credit-default-risk/application_train.csv" df_data = sqlContext.read.format("csv").option("header", "true").load(data_path) df_data.printSchema() print("Rows: {}, Cols: {}".format(df_data.count(), len(df_data.columns))) # # SELECT, WHERE, DISTINCT, LIMIT # In[5]: print(""" SELECT * FROM pdf_data LIMIT 3 """) df_data.limit(3).take(1) # In[6]: print(""" SELECT NAME_CONTRACT_TYPE FROM pdf_data WHERE CODE_GENDER = 'M' """) df_data.where("CODE_GENDER = 'M'").select("NAME_CONTRACT_TYPE").show(5) # In[7]: print(""" SELECT DISTINCT NAME_CONTRACT_TYPE FROM pdf_data """) df_data.select("NAME_CONTRACT_TYPE").distinct().show(5) # # SELECT with multiple conditions # In[8]: print(""" SELECT NAME_INCOME_TYPE, CODE_GENDER, AMT_INCOME_TOTAL FROM pdf_data WHERE CODE_GENDER = 'M' AND AMT_INCOME_TOTAL > 200000.0 """) (df_data.where("CODE_GENDER = 'M' and AMT_INCOME_TOTAL > 200000.0") .select("NAME_INCOME_TYPE", "CODE_GENDER", "AMT_INCOME_TOTAL")).show(5) # # ORDER BY # In[9]: print(""" SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL FROM pdf_data ORDER BY AMT_INCOME_TOTAL """) df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL").orderBy(["AMT_INCOME_TOTAL"]).show(5) # In[10]: print(""" SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL FROM pdf_data ORDER BY AMT_INCOME_TOTAL DESC """) df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL").orderBy(["AMT_INCOME_TOTAL"], ascending=[0]).show(5) # # IN… NOT IN # In[11]: print(""" SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL FROM pdf_data WHERE SK_ID_CURR IN (100002, 100010, 100011) """) (df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL") .where("SK_ID_CURR IN {}".format((100002, 100010, 100011)))).show() # In[12]: print(""" SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL FROM pdf_data WHERE SK_ID_CURR NOT IN (100002, 100010, 100011) """) (df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL") .where("SK_ID_CURR NOT IN {}".format((100002, 100010, 100011)))).show(5) # # GROUP BY, COUNT, ORDER BY # In[13]: print(""" SELECT CODE_GENDER, COUNT(TARGET) FROM pdf_data GROUP BY CODE_GENDER ORDER BY NUM_TARGET """) df_data.groupBy("CODE_GENDER").agg(sf.count(sf.col("TARGET")).alias("count")).orderBy("count").show() # # Aggregate functions (MIN, MAX, MEAN) # In[14]: print(""" SELECT MAX(AMT_INCOME_TOTAL), MIN(AMT_INCOME_TOTAL), MEAN(AMT_INCOME_TOTAL) FROM pdf_data """) df_data.selectExpr(["MAX(AMT_INCOME_TOTAL)", "MIN(AMT_INCOME_TOTAL)", "MEAN(AMT_INCOME_TOTAL)"]).show() # # JOIN # # - (INNER) JOIN: Returns records that have matching values in both tables # - LEFT (OUTER) JOIN: Return all records from the left table, and the matched records from the right table # - RIGHT (OUTER) JOIN: Return all records from the right table, and the matched records from the left table # - FULL (OUTER) JOIN: Return all records when there is a match in either left or right table # In[15]: print(""" SELECT * FROM df1 INNER JOIN df2 ON df1.SK_ID_CURR = df2.SK_ID_CURR """) df1 = df_data.select("SK_ID_CURR", "AMT_INCOME_TOTAL") df2 = df_data.select("SK_ID_CURR", "CODE_GENDER", "FLAG_OWN_CAR") df1.join(df2, on="SK_ID_CURR", how="inner").show() # # UNION ALL and UNION # In[16]: print(""" SELECT * FROM df1 UNION ALL SELECT * FROM df2 """) df1 = df_data.select("CODE_GENDER", "FLAG_OWN_CAR") df2 = df_data.select("CODE_GENDER", "FLAG_OWN_CAR") print("Union all:", df1.unionAll(df2).count()) print("Union:", df1.unionAll(df2).distinct().count()) # # Insert, delete, update # # Spark data frame is immutable