In [6]:
spark.version
Out[6]:
2.4.8

date_add / date_sub

In [8]:
import java.sql.Timestamp.valueOf
import org.apache.spark.sql.functions.{date_add, date_sub}

// note that the dates are just strings
val df = Seq(
    ("notebook",    valueOf("2019-01-29 12:00:00")),
    ("notebook",    valueOf("2019-01-01 00:00:00")),
    ("small_phone", valueOf("2019-01-15 23:00:00")),
    ("small_phone", valueOf("2019-01-01 09:00:00"))
).toDF("device", "purchase_time").sort("device","purchase_time")
df = [device: string, purchase_time: timestamp]
Out[8]:
[device: string, purchase_time: timestamp]
In [9]:
%%dataframe
df
Out[9]:
devicepurchase_time
notebook2019-01-01 00:00:00.0
notebook2019-01-29 12:00:00.0
small_phone2019-01-01 09:00:00.0
small_phone2019-01-15 23:00:00.0
In [10]:
%%dataframe
df.withColumn("plus_2_days", date_add($"purchase_time",2))
Out[10]:
devicepurchase_timeplus_2_days
notebook2019-01-01 00:00:00.02019-01-03
notebook2019-01-29 12:00:00.02019-01-31
small_phone2019-01-01 09:00:00.02019-01-03
small_phone2019-01-15 23:00:00.02019-01-17

datediff

In [11]:
import java.sql.Date.valueOf
import org.apache.spark.sql.functions.datediff

val df = Seq(
    ("notebook",    valueOf("2019-01-29"), valueOf("2019-02-10")),
    ("notebook",    valueOf("2019-01-01"), valueOf("2019-01-15")),
    ("small_phone", valueOf("2019-01-15"), valueOf("2019-01-05")),
    ("small_phone", valueOf("2019-01-01"), valueOf("2019-01-20"))
).toDF("device", "purchase_date", "arrival_date").sort("device","purchase_date")
df = [device: string, purchase_date: date ... 1 more field]
Out[11]:
[device: string, purchase_date: date ... 1 more field]
In [12]:
%%dataframe
df
Out[12]:
devicepurchase_datearrival_date
notebook2019-01-012019-01-15
notebook2019-01-292019-02-10
small_phone2019-01-012019-01-20
small_phone2019-01-152019-01-05
In [13]:
%%dataframe
df.withColumn("days_to_arrive",datediff($"arrival_date", $"purchase_date"))
Out[13]:
devicepurchase_datearrival_datedays_to_arrive
notebook2019-01-012019-01-1514
notebook2019-01-292019-02-1012
small_phone2019-01-012019-01-2019
small_phone2019-01-152019-01-05-10

difference in seconds

In [14]:
import java.sql.Timestamp.valueOf
import org.apache.spark.sql.functions.unix_timestamp

val df = Seq(
    ("foo", valueOf("2019-01-01 00:00:00"), valueOf("2019-01-01 01:00:00")), // 1 hour apart
    ("bar", valueOf("2019-01-01 00:00:00"), valueOf("2019-01-02 00:00:00")), // 24 hours apart
    ("baz", valueOf("2019-01-01 00:00:00"), valueOf("2019-01-07 00:00:00"))  // 7 days apart
).toDF("col1", "purchase_time", "arrival_time").sort("col1", "purchase_time")
df = [col1: string, purchase_time: timestamp ... 1 more field]
Out[14]:
[col1: string, purchase_time: timestamp ... 1 more field]
In [15]:
%%dataframe
df
Out[15]:
col1purchase_timearrival_time
bar2019-01-01 00:00:00.02019-01-02 00:00:00.0
baz2019-01-01 00:00:00.02019-01-07 00:00:00.0
foo2019-01-01 00:00:00.02019-01-01 01:00:00.0
In [16]:
%%dataframe

df.withColumn("diff_in_seconds_2", unix_timestamp($"arrival_time") - unix_timestamp($"purchase_time"))
Out[16]:
col1purchase_timearrival_timediff_in_seconds_2
bar2019-01-01 00:00:00.02019-01-02 00:00:00.086400
baz2019-01-01 00:00:00.02019-01-07 00:00:00.0518400
foo2019-01-01 00:00:00.02019-01-01 01:00:00.03600

difference in milliseconds

In [17]:
import java.sql.Timestamp.valueOf
import org.apache.spark.sql.functions.to_timestamp

val df = Seq(
    ("foo", valueOf("2019-01-01 00:00:00.000"), valueOf("2019-01-01 00:00:00.400")), 
    ("bar", valueOf("2019-01-01 00:00:00.000"), valueOf("2019-01-01 00:00:00.650")), 
    ("baz", valueOf("2019-01-01 00:00:00.000"), valueOf("2019-01-01 00:01:00.000")) 
).toDF("col1", "time_before", "time_after")
df = [col1: string, time_before: timestamp ... 1 more field]
Out[17]:
[col1: string, time_before: timestamp ... 1 more field]
In [18]:
%%dataframe
df
Out[18]:
col1time_beforetime_after
foo2019-01-01 00:00:00.02019-01-01 00:00:00.4
bar2019-01-01 00:00:00.02019-01-01 00:00:00.65
baz2019-01-01 00:00:00.02019-01-01 00:01:00.0
In [19]:
%%dataframe
%%scan

(df
.withColumn("diff_millis", ($"time_after".cast("double") - $"time_before".cast("double")))
.withColumn("diff_millis", ($"diff_millis"*1000).cast("long")))
Out[19]:
col1time_beforetime_afterdiff_millis
foo2019-01-01 00:00:00.02019-01-01 00:00:00.4400
bar2019-01-01 00:00:00.02019-01-01 00:00:00.65650
baz2019-01-01 00:00:00.02019-01-01 00:01:00.060000

expr interval

In [20]:
import java.sql.Timestamp.valueOf
import org.apache.spark.sql.functions.expr

val df = Seq(
    ("foo", valueOf("2019-10-10 00:45:00")), 
    ("bar", valueOf("2019-10-10 12:34:56")), 
    ("baz", valueOf("2019-10-10 23:59:00")) 
).toDF("col1", "timestamp_col")
df = [col1: string, timestamp_col: timestamp]
Out[20]:
[col1: string, timestamp_col: timestamp]
In [21]:
%%dataframe
df
Out[21]:
col1timestamp_col
foo2019-10-10 00:45:00.0
bar2019-10-10 12:34:56.0
baz2019-10-10 23:59:00.0
In [23]:
%%dataframe
%%scan

df
.withColumn("timestamp_minus_24_hours", $"timestamp_col" - expr("INTERVAL 24 HOURS"))
Out[23]:
col1timestamp_coltimestamp_minus_24_hours
foo2019-10-10 00:45:00.02019-10-09 00:45:00.0
bar2019-10-10 12:34:56.02019-10-09 12:34:56.0
baz2019-10-10 23:59:00.02019-10-09 23:59:00.0
In [ ]: