spark.version
2.4.8
import java.sql.Date
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val devicesDf = Seq(
(Date.valueOf("2019-01-01"), "notebook", 600.00),
(Date.valueOf("2019-05-10"), "notebook", 1200.00),
(Date.valueOf("2019-03-05"), "small phone", 100.00),
(Date.valueOf("2019-02-20"), "camera",150.00),
(Date.valueOf("2019-01-20"), "small phone", 300.00),
(Date.valueOf("2019-02-15"), "large phone", 700.00),
(Date.valueOf("2019-07-01"), "camera", 300.00),
(Date.valueOf("2019-04-01"), "small phone", 50.00)
).toDF("purchase_date", "device", "price")
devicesDf = [purchase_date: date, device: string ... 1 more field]
[purchase_date: date, device: string ... 1 more field]
%%dataframe
devicesDf.sort("purchase_date")
purchase_date | device | price |
---|---|---|
2019-01-01 | notebook | 600.0 |
2019-01-20 | small phone | 300.0 |
2019-02-15 | large phone | 700.0 |
2019-02-20 | camera | 150.0 |
2019-03-05 | small phone | 100.0 |
2019-04-01 | small phone | 50.0 |
2019-05-10 | notebook | 1200.0 |
2019-07-01 | camera | 300.0 |
%%dataframe
:paste
devicesDf
.withColumn("average_price_in_group", mean("price") over Window.partitionBy("device"))
purchase_date | device | price | average_price_in_group |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 700.0 |
2019-03-05 | small phone | 100.0 | 150.0 |
2019-01-20 | small phone | 300.0 | 150.0 |
2019-04-01 | small phone | 50.0 | 150.0 |
2019-01-01 | notebook | 600.0 | 900.0 |
2019-05-10 | notebook | 1200.0 | 900.0 |
2019-02-20 | camera | 150.0 | 225.0 |
2019-07-01 | camera | 300.0 | 225.0 |
%%dataframe
:paste
devicesDf.withColumn("max_price_in_group", max("price") over Window.partitionBy("device"))
purchase_date | device | price | max_price_in_group |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 700.0 |
2019-03-05 | small phone | 100.0 | 300.0 |
2019-01-20 | small phone | 300.0 | 300.0 |
2019-04-01 | small phone | 50.0 | 300.0 |
2019-01-01 | notebook | 600.0 | 1200.0 |
2019-05-10 | notebook | 1200.0 | 1200.0 |
2019-02-20 | camera | 150.0 | 300.0 |
2019-07-01 | camera | 300.0 | 300.0 |
%%dataframe
:paste
devicesDf
.withColumn("max_price_in_group", max("price") over Window.partitionBy("device"))
.filter($"price" === $"max_price_in_group")
purchase_date | device | price | max_price_in_group |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 700.0 |
2019-01-20 | small phone | 300.0 | 300.0 |
2019-05-10 | notebook | 1200.0 | 1200.0 |
2019-07-01 | camera | 300.0 | 300.0 |
%%dataframe
:paste
devicesDf
.withColumn("most_recent_purchase_in_group", max("purchase_date") over Window.partitionBy("device"))
purchase_date | device | price | most_recent_purchase_in_group |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 2019-02-15 |
2019-03-05 | small phone | 100.0 | 2019-04-01 |
2019-01-20 | small phone | 300.0 | 2019-04-01 |
2019-04-01 | small phone | 50.0 | 2019-04-01 |
2019-01-01 | notebook | 600.0 | 2019-05-10 |
2019-05-10 | notebook | 1200.0 | 2019-05-10 |
2019-02-20 | camera | 150.0 | 2019-07-01 |
2019-07-01 | camera | 300.0 | 2019-07-01 |
%%dataframe
:paste
devicesDf
.withColumn("most_recent_purchase_in_group", max("purchase_date") over Window.partitionBy("device"))
.filter($"purchase_date" === $"most_recent_purchase_in_group")
purchase_date | device | price | most_recent_purchase_in_group |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 2019-02-15 |
2019-04-01 | small phone | 50.0 | 2019-04-01 |
2019-05-10 | notebook | 1200.0 | 2019-05-10 |
2019-07-01 | camera | 300.0 | 2019-07-01 |
%%dataframe
:paste
devicesDf
.withColumn("percentile", percent_rank() over Window.orderBy("price"))
purchase_date | device | price | percentile |
---|---|---|---|
2019-04-01 | small phone | 50.0 | 0.0 |
2019-03-05 | small phone | 100.0 | 0.14285714285714285 |
2019-02-20 | camera | 150.0 | 0.2857142857142857 |
2019-01-20 | small phone | 300.0 | 0.42857142857142855 |
2019-07-01 | camera | 300.0 | 0.42857142857142855 |
2019-01-01 | notebook | 600.0 | 0.7142857142857143 |
2019-02-15 | large phone | 700.0 | 0.8571428571428571 |
2019-05-10 | notebook | 1200.0 | 1.0 |
%%dataframe
:paste
devicesDf
.withColumn("percentile", percent_rank() over Window.orderBy("price"))
.filter($"percentile" >= 0.5)
.limit(1)
purchase_date | device | price | percentile |
---|---|---|---|
2019-01-01 | notebook | 600.0 | 0.7142857142857143 |
what's the lowest price over percentile 85?
%%dataframe
:paste
devicesDf
.withColumn("percentile", percent_rank() over Window.orderBy("price"))
purchase_date | device | price | percentile |
---|---|---|---|
2019-04-01 | small phone | 50.0 | 0.0 |
2019-03-05 | small phone | 100.0 | 0.14285714285714285 |
2019-02-20 | camera | 150.0 | 0.2857142857142857 |
2019-01-20 | small phone | 300.0 | 0.42857142857142855 |
2019-07-01 | camera | 300.0 | 0.42857142857142855 |
2019-01-01 | notebook | 600.0 | 0.7142857142857143 |
2019-02-15 | large phone | 700.0 | 0.8571428571428571 |
2019-05-10 | notebook | 1200.0 | 1.0 |
%%dataframe
:paste
devicesDf
.withColumn("percentile", percent_rank() over Window.orderBy("price"))
.filter($"percentile" >= 0.85)
.limit(1)
purchase_date | device | price | percentile |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 0.8571428571428571 |
cumulative sum requires an ordered window
%%dataframe
:paste
devicesDf
.withColumn("cumulative_sum", sum("price") over Window.orderBy("purchase_date"))
purchase_date | device | price | cumulative_sum |
---|---|---|---|
2019-01-01 | notebook | 600.0 | 600.0 |
2019-01-20 | small phone | 300.0 | 900.0 |
2019-02-15 | large phone | 700.0 | 1600.0 |
2019-02-20 | camera | 150.0 | 1750.0 |
2019-03-05 | small phone | 100.0 | 1850.0 |
2019-04-01 | small phone | 50.0 | 1900.0 |
2019-05-10 | notebook | 1200.0 | 3100.0 |
2019-07-01 | camera | 300.0 | 3400.0 |
%%dataframe
:paste
devicesDf
.withColumn("row_number", row_number() over Window.orderBy("purchase_date"))
purchase_date | device | price | row_number |
---|---|---|---|
2019-01-01 | notebook | 600.0 | 1 |
2019-01-20 | small phone | 300.0 | 2 |
2019-02-15 | large phone | 700.0 | 3 |
2019-02-20 | camera | 150.0 | 4 |
2019-03-05 | small phone | 100.0 | 5 |
2019-04-01 | small phone | 50.0 | 6 |
2019-05-10 | notebook | 1200.0 | 7 |
2019-07-01 | camera | 300.0 | 8 |
also called rank
row_number
requires an ordered window
%%dataframe
:paste
devicesDf
.withColumn("row_number", row_number() over Window.partitionBy("device").orderBy("purchase_date"))
purchase_date | device | price | row_number |
---|---|---|---|
2019-02-15 | large phone | 700.0 | 1 |
2019-01-20 | small phone | 300.0 | 1 |
2019-03-05 | small phone | 100.0 | 2 |
2019-04-01 | small phone | 50.0 | 3 |
2019-01-01 | notebook | 600.0 | 1 |
2019-05-10 | notebook | 1200.0 | 2 |
2019-02-20 | camera | 150.0 | 1 |
2019-07-01 | camera | 300.0 | 2 |
val df1 = Seq(("1","x"), ("2", "y")).toDF("a","b");
val df2 = Seq(("1","x2"), ("3", "z")).toDF("a","b");
df1 = [a: string, b: string] df2 = [a: string, b: string]
[a: string, b: string]
%%dataframe
df1
a | b |
---|---|
1 | x |
2 | y |
%%dataframe
df2
a | b |
---|---|
1 | x2 |
3 | z |
%%dataframe
df1.as("df1").join(df2.as("df2"),col("df1.a")===col("df2.a"),"outer")
a | b | a | b |
---|---|---|---|
null | null | 3 | z |
1 | x | 1 | x2 |
2 | y | null | null |
%%dataframe
df1.join(df2, Seq("a"), "inner")
a | b | b |
---|---|---|
1 | x | x2 |