import os, sys, glob, datetime
# specify spark version, python version
spark_home = "/home/zero/spark-2.4.0-bin-hadoop2.7" # MODIFY THIS
python_path ="/apps/anaconda2/bin/python"
# set environment variables
os.environ['SPARK_HOME'] = spark_home
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"
def setup_spark_env(app_name):
# set environment variables
spark_python = os.path.join(spark_home, 'python')
py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
sys.path[:0] = [spark_python, py4j]
# specify Spark application parameters
PYSPARK_SUBMIT_ARGS="--master local[2]"
os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS
+ " --name '%s_%s'"%(app_name, datetime.datetime.now().strftime("%Y%m%d %H:%M"))
+ " pyspark-shell")
return
#
setup_spark_env("your_name_test_spark") # MODIFY THIS
# launching PySpark application
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
sc.setLogLevel('ERROR')
print("{}".format(sc.applicationId))
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0 /_/ Using Python version 2.7.15 (default, Dec 14 2018 19:04:19) SparkSession available as 'spark'. local-1560753877291
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
import numpy as np
import pandas as pd
data = [
["Thin", "Cell phone", 6000],
["Normal", "Tablet", 1500],
["Mini", "Tablet", 5500],
["Ultra thin", "Cell phone", 5000],
["Very thin", "Cell phone", 6000],
["Big", "Tablet", 2500],
["Bendable", "Cell phone", 3000],
["Foldable", "Cell phone", 3000],
["Pro", "Tablet", 4500],
["Pro2", "Tablet", 6500],
]
productReveneue = pd.DataFrame(data, columns=["product", "category", "revenue"])
productReveneue.head(10)
product | category | revenue | |
---|---|---|---|
0 | Thin | Cell phone | 6000 |
1 | Normal | Tablet | 1500 |
2 | Mini | Tablet | 5500 |
3 | Ultra thin | Cell phone | 5000 |
4 | Very thin | Cell phone | 6000 |
5 | Big | Tablet | 2500 |
6 | Bendable | Cell phone | 3000 |
7 | Foldable | Cell phone | 3000 |
8 | Pro | Tablet | 4500 |
9 | Pro2 | Tablet | 6500 |
# convert to Spark Dataframe
df_productReveneue = sqlContext.createDataFrame(productReveneue)
df_productReveneue.show()
+----------+----------+-------+ | product| category|revenue| +----------+----------+-------+ | Thin|Cell phone| 6000| | Normal| Tablet| 1500| | Mini| Tablet| 5500| |Ultra thin|Cell phone| 5000| | Very thin|Cell phone| 6000| | Big| Tablet| 2500| | Bendable|Cell phone| 3000| | Foldable|Cell phone| 3000| | Pro| Tablet| 4500| | Pro2| Tablet| 6500| +----------+----------+-------+
# Signature: sf.dense_rank()
# Docstring:
# Window function: returns the rank of rows within a window partition, without any gaps.
# The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
# sequence when there are ties. That is, if you were ranking a competition using dense_rank
# and had three people tie for second place, you would say that all three were in second
# place and that the next person came in third. Rank would give me sequential numbers, making
# the person that came in third place (after the ties) would register as coming in fifth.
# This is equivalent to the DENSE_RANK function in SQL.
query_window = Window.partitionBy("category").orderBy("revenue")
df_query = df_productReveneue.withColumn("dense_rank", sf.dense_rank().over(query_window))
df_query.show()
+----------+----------+-------+----------+ | product| category|revenue|dense_rank| +----------+----------+-------+----------+ | Bendable|Cell phone| 3000| 1| | Foldable|Cell phone| 3000| 1| |Ultra thin|Cell phone| 5000| 2| | Thin|Cell phone| 6000| 3| | Very thin|Cell phone| 6000| 3| | Normal| Tablet| 1500| 1| | Big| Tablet| 2500| 2| | Pro| Tablet| 4500| 3| | Mini| Tablet| 5500| 4| | Pro2| Tablet| 6500| 5| +----------+----------+-------+----------+
# Signature: sf.rank()
# Docstring:
# Window function: returns the rank of rows within a window partition.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("rank", sf.rank().over(query_window))
df_query.show()
+----------+----------+-------+----+ | product| category|revenue|rank| +----------+----------+-------+----+ | Thin|Cell phone| 6000| 1| | Very thin|Cell phone| 6000| 1| |Ultra thin|Cell phone| 5000| 3| | Bendable|Cell phone| 3000| 4| | Foldable|Cell phone| 3000| 4| | Pro2| Tablet| 6500| 1| | Mini| Tablet| 5500| 2| | Pro| Tablet| 4500| 3| | Big| Tablet| 2500| 4| | Normal| Tablet| 1500| 5| +----------+----------+-------+----+
# Signature: sf.percent_rank()
# Docstring:
# Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("percent_rank", sf.percent_rank().over(query_window))
df_query.show()
+----------+----------+-------+------------+ | product| category|revenue|percent_rank| +----------+----------+-------+------------+ | Thin|Cell phone| 6000| 0.0| | Very thin|Cell phone| 6000| 0.0| |Ultra thin|Cell phone| 5000| 0.5| | Bendable|Cell phone| 3000| 0.75| | Foldable|Cell phone| 3000| 0.75| | Pro2| Tablet| 6500| 0.0| | Mini| Tablet| 5500| 0.25| | Pro| Tablet| 4500| 0.5| | Big| Tablet| 2500| 0.75| | Normal| Tablet| 1500| 1.0| +----------+----------+-------+------------+
# Signature: sf.ntile(n)
# Docstring:
# Window function: returns the ntile group id (from 1 to `n` inclusive)
# in an ordered window partition. For example, if `n` is 4, the first
# quarter of the rows will get value 1, the second quarter will get 2,
# the third quarter will get 3, and the last quarter will get 4.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("ntile", sf.ntile(3).over(query_window))
df_query.show()
+----------+----------+-------+-----+ | product| category|revenue|ntile| +----------+----------+-------+-----+ | Thin|Cell phone| 6000| 1| | Very thin|Cell phone| 6000| 1| |Ultra thin|Cell phone| 5000| 2| | Bendable|Cell phone| 3000| 2| | Foldable|Cell phone| 3000| 3| | Pro2| Tablet| 6500| 1| | Mini| Tablet| 5500| 1| | Pro| Tablet| 4500| 2| | Big| Tablet| 2500| 2| | Normal| Tablet| 1500| 3| +----------+----------+-------+-----+
# Signature: sf.row_number()
# Docstring:
# Window function: returns a sequential number starting at 1 within a window partition.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("row_number", sf.row_number().over(query_window))
df_query.show()
+----------+----------+-------+----------+ | product| category|revenue|row_number| +----------+----------+-------+----------+ | Thin|Cell phone| 6000| 1| | Very thin|Cell phone| 6000| 2| |Ultra thin|Cell phone| 5000| 3| | Bendable|Cell phone| 3000| 4| | Foldable|Cell phone| 3000| 5| | Pro2| Tablet| 6500| 1| | Mini| Tablet| 5500| 2| | Pro| Tablet| 4500| 3| | Big| Tablet| 2500| 4| | Normal| Tablet| 1500| 5| +----------+----------+-------+----------+
# Signature: sf.cume_dist()
# Docstring:
# Window function: returns the cumulative distribution of values within a window partition,
# i.e. the fraction of rows that are below the current row.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("cume_dist", sf.cume_dist().over(query_window))
df_query.show()
+----------+----------+-------+---------+ | product| category|revenue|cume_dist| +----------+----------+-------+---------+ | Thin|Cell phone| 6000| 0.4| | Very thin|Cell phone| 6000| 0.4| |Ultra thin|Cell phone| 5000| 0.6| | Bendable|Cell phone| 3000| 1.0| | Foldable|Cell phone| 3000| 1.0| | Pro2| Tablet| 6500| 0.2| | Mini| Tablet| 5500| 0.4| | Pro| Tablet| 4500| 0.6| | Big| Tablet| 2500| 0.8| | Normal| Tablet| 1500| 1.0| +----------+----------+-------+---------+
# Signature: sf.first(col, ignorenulls=False)
# Docstring:
# Aggregate function: returns the first value in a group.
# The function by default returns the first values it sees. It will return the first non-null
# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("first", sf.first("product").over(query_window))
df_query.show()
+----------+----------+-------+-----+ | product| category|revenue|first| +----------+----------+-------+-----+ | Thin|Cell phone| 6000| Thin| | Very thin|Cell phone| 6000| Thin| |Ultra thin|Cell phone| 5000| Thin| | Bendable|Cell phone| 3000| Thin| | Foldable|Cell phone| 3000| Thin| | Pro2| Tablet| 6500| Pro2| | Mini| Tablet| 5500| Pro2| | Pro| Tablet| 4500| Pro2| | Big| Tablet| 2500| Pro2| | Normal| Tablet| 1500| Pro2| +----------+----------+-------+-----+
# Signature: sf.last(col, ignorenulls=False)
# Docstring:
# Aggregate function: returns the last value in a group.
# The function by default returns the last values it sees. It will return the last non-null
# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
# NOTE: not reliable
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("last", sf.last("product").over(query_window))
df_query.show()
+----------+----------+-------+----------+ | product| category|revenue| last| +----------+----------+-------+----------+ | Thin|Cell phone| 6000| Very thin| | Very thin|Cell phone| 6000| Very thin| |Ultra thin|Cell phone| 5000|Ultra thin| | Bendable|Cell phone| 3000| Foldable| | Foldable|Cell phone| 3000| Foldable| | Pro2| Tablet| 6500| Pro2| | Mini| Tablet| 5500| Mini| | Pro| Tablet| 4500| Pro| | Big| Tablet| 2500| Big| | Normal| Tablet| 1500| Normal| +----------+----------+-------+----------+
# Signature: sf.lag(col, count=1, default=None)
# Docstring:
# Window function: returns the value that is `offset` rows before the current row, and
# `defaultValue` if there is less than `offset` rows before the current row. For example,
# an `offset` of one will return the previous row at any given point in the window partition.
# This is equivalent to the LAG function in SQL.
# :param col: name of column or expression
# :param count: number of row to extend
# :param default: default value
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("lag", sf.lag("revenue").over(query_window))
df_query.show()
+----------+----------+-------+----+ | product| category|revenue| lag| +----------+----------+-------+----+ | Thin|Cell phone| 6000|null| | Very thin|Cell phone| 6000|6000| |Ultra thin|Cell phone| 5000|6000| | Bendable|Cell phone| 3000|5000| | Foldable|Cell phone| 3000|3000| | Pro2| Tablet| 6500|null| | Mini| Tablet| 5500|6500| | Pro| Tablet| 4500|5500| | Big| Tablet| 2500|4500| | Normal| Tablet| 1500|2500| +----------+----------+-------+----+
# Signature: sf.lead(col, count=1, default=None)
# Docstring:
# Window function: returns the value that is `offset` rows after the current row, and
# `defaultValue` if there is less than `offset` rows after the current row. For example,
# an `offset` of one will return the next row at any given point in the window partition.
# This is equivalent to the LEAD function in SQL.
# :param col: name of column or expression
# :param count: number of row to extend
# :param default: default value
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("lead", sf.lead("revenue").over(query_window))
df_query.show()
+----------+----------+-------+----+ | product| category|revenue|lead| +----------+----------+-------+----+ | Thin|Cell phone| 6000|6000| | Very thin|Cell phone| 6000|5000| |Ultra thin|Cell phone| 5000|3000| | Bendable|Cell phone| 3000|3000| | Foldable|Cell phone| 3000|null| | Pro2| Tablet| 6500|5500| | Mini| Tablet| 5500|4500| | Pro| Tablet| 4500|2500| | Big| Tablet| 2500|1500| | Normal| Tablet| 1500|null| +----------+----------+-------+----+
# populate same result across rows
query_window = Window.partitionBy("category")
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query = df_query.withColumn("avg_revenue", sf.avg("revenue").over(query_window))
df_query = df_query.withColumn("total_revenue", sf.sum("revenue").over(query_window))
df_query.show()
+----------+----------+-------+-----------+-----------+-------------+ | product| category|revenue|max_revenue|avg_revenue|total_revenue| +----------+----------+-------+-----------+-----------+-------------+ | Thin|Cell phone| 6000| 6000| 4600.0| 23000| |Ultra thin|Cell phone| 5000| 6000| 4600.0| 23000| | Very thin|Cell phone| 6000| 6000| 4600.0| 23000| | Bendable|Cell phone| 3000| 6000| 4600.0| 23000| | Foldable|Cell phone| 3000| 6000| 4600.0| 23000| | Normal| Tablet| 1500| 6500| 4100.0| 20500| | Mini| Tablet| 5500| 6500| 4100.0| 20500| | Big| Tablet| 2500| 6500| 4100.0| 20500| | Pro| Tablet| 4500| 6500| 4100.0| 20500| | Pro2| Tablet| 6500| 6500| 4100.0| 20500| +----------+----------+-------+-----------+-----------+-------------+
# will accumulate if use orderBy
query_window = Window.partitionBy("category").orderBy("revenue")
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query = df_query.withColumn("avg_revenue", sf.avg("revenue").over(query_window))
df_query = df_query.withColumn("total_revenue", sf.sum("revenue").over(query_window))
df_query.show()
+----------+----------+-------+-----------+------------------+-------------+ | product| category|revenue|max_revenue| avg_revenue|total_revenue| +----------+----------+-------+-----------+------------------+-------------+ | Bendable|Cell phone| 3000| 3000| 3000.0| 6000| | Foldable|Cell phone| 3000| 3000| 3000.0| 6000| |Ultra thin|Cell phone| 5000| 5000|3666.6666666666665| 11000| | Thin|Cell phone| 6000| 6000| 4600.0| 23000| | Very thin|Cell phone| 6000| 6000| 4600.0| 23000| | Normal| Tablet| 1500| 1500| 1500.0| 1500| | Big| Tablet| 2500| 2500| 2000.0| 4000| | Pro| Tablet| 4500| 4500|2833.3333333333335| 8500| | Mini| Tablet| 5500| 5500| 3500.0| 14000| | Pro2| Tablet| 6500| 6500| 4100.0| 20500| +----------+----------+-------+-----------+------------------+-------------+
query_window = Window.partitionBy("category").orderBy("revenue").rowsBetween(-1, 1)
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query.show()
+----------+----------+-------+-----------+ | product| category|revenue|max_revenue| +----------+----------+-------+-----------+ | Bendable|Cell phone| 3000| 3000| | Foldable|Cell phone| 3000| 5000| |Ultra thin|Cell phone| 5000| 6000| | Thin|Cell phone| 6000| 6000| | Very thin|Cell phone| 6000| 6000| | Normal| Tablet| 1500| 2500| | Big| Tablet| 2500| 4500| | Pro| Tablet| 4500| 5500| | Mini| Tablet| 5500| 6500| | Pro2| Tablet| 6500| 6500| +----------+----------+-------+-----------+
query_window = Window.partitionBy("category").orderBy("revenue").rangeBetween(0, 1000)
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query.show()
+----------+----------+-------+-----------+ | product| category|revenue|max_revenue| +----------+----------+-------+-----------+ | Bendable|Cell phone| 3000| 3000| | Foldable|Cell phone| 3000| 3000| |Ultra thin|Cell phone| 5000| 6000| | Thin|Cell phone| 6000| 6000| | Very thin|Cell phone| 6000| 6000| | Normal| Tablet| 1500| 2500| | Big| Tablet| 2500| 2500| | Pro| Tablet| 4500| 5500| | Mini| Tablet| 5500| 6500| | Pro2| Tablet| 6500| 6500| +----------+----------+-------+-----------+
query_window = Window.partitionBy("category").orderBy("revenue").rowsBetween(Window.unboundedPreceding, 0)
df_query = df_productReveneue.withColumn("accum_revenue", sf.sum("revenue").over(query_window))
df_query.show()
+----------+----------+-------+-------------+ | product| category|revenue|accum_revenue| +----------+----------+-------+-------------+ | Bendable|Cell phone| 3000| 3000| | Foldable|Cell phone| 3000| 6000| |Ultra thin|Cell phone| 5000| 11000| | Thin|Cell phone| 6000| 17000| | Very thin|Cell phone| 6000| 23000| | Normal| Tablet| 1500| 1500| | Big| Tablet| 2500| 4000| | Pro| Tablet| 4500| 8500| | Mini| Tablet| 5500| 14000| | Pro2| Tablet| 6500| 20500| +----------+----------+-------+-------------+