Basic Operations
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Operations").getOrCreate()
df = spark.read.csv("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/appl_stock.csv",inferSchema=True,header=True)
df.show()
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ |2010-01-04 00:00:00| 213.429998| 214.499996|212.38000099999996| 214.009998|123432400| 27.727039| |2010-01-05 00:00:00| 214.599998| 215.589994| 213.249994| 214.379993|150476200|27.774976000000002| |2010-01-06 00:00:00| 214.379993| 215.23| 210.750004| 210.969995|138040000|27.333178000000004| |2010-01-07 00:00:00| 211.75| 212.000006| 209.050005| 210.58|119282800| 27.28265| |2010-01-08 00:00:00| 210.299994| 212.000006|209.06000500000002|211.98000499999998|111902700| 27.464034| |2010-01-11 00:00:00|212.79999700000002| 213.000002| 208.450005|210.11000299999998|115557400| 27.221758| |2010-01-12 00:00:00|209.18999499999998|209.76999500000002| 206.419998| 207.720001|148614900| 26.91211| |2010-01-13 00:00:00| 207.870005|210.92999500000002| 204.099998| 210.650002|151473000| 27.29172| |2010-01-14 00:00:00|210.11000299999998|210.45999700000002| 209.020004| 209.43|108223500| 27.133657| |2010-01-15 00:00:00|210.92999500000002|211.59999700000003| 205.869999| 205.93|148516900|26.680197999999997| |2010-01-19 00:00:00| 208.330002|215.18999900000003| 207.240004| 215.039995|182501900|27.860484999999997| |2010-01-20 00:00:00| 214.910006| 215.549994| 209.500002| 211.73|153038200| 27.431644| |2010-01-21 00:00:00| 212.079994|213.30999599999998| 207.210003| 208.069996|152038600| 26.957455| |2010-01-22 00:00:00|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-25 00:00:00|202.51000200000001| 204.699999| 200.190002| 203.070002|266424900|26.309658000000002| |2010-01-26 00:00:00|205.95000100000001| 213.710005| 202.580004| 205.940001|466777500| 26.681494| |2010-01-27 00:00:00| 206.849995| 210.58| 199.530001| 207.880005|430642100|26.932840000000002| |2010-01-28 00:00:00| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208| |2010-02-01 00:00:00|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ only showing top 20 rows
df.printSchema()
root |-- Date: timestamp (nullable = true) |-- Open: double (nullable = true) |-- High: double (nullable = true) |-- Low: double (nullable = true) |-- Close: double (nullable = true) |-- Volume: integer (nullable = true) |-- Adj Close: double (nullable = true)
Filter Data
df.filter("Close<500").show()
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ |2010-01-04 00:00:00| 213.429998| 214.499996|212.38000099999996| 214.009998|123432400| 27.727039| |2010-01-05 00:00:00| 214.599998| 215.589994| 213.249994| 214.379993|150476200|27.774976000000002| |2010-01-06 00:00:00| 214.379993| 215.23| 210.750004| 210.969995|138040000|27.333178000000004| |2010-01-07 00:00:00| 211.75| 212.000006| 209.050005| 210.58|119282800| 27.28265| |2010-01-08 00:00:00| 210.299994| 212.000006|209.06000500000002|211.98000499999998|111902700| 27.464034| |2010-01-11 00:00:00|212.79999700000002| 213.000002| 208.450005|210.11000299999998|115557400| 27.221758| |2010-01-12 00:00:00|209.18999499999998|209.76999500000002| 206.419998| 207.720001|148614900| 26.91211| |2010-01-13 00:00:00| 207.870005|210.92999500000002| 204.099998| 210.650002|151473000| 27.29172| |2010-01-14 00:00:00|210.11000299999998|210.45999700000002| 209.020004| 209.43|108223500| 27.133657| |2010-01-15 00:00:00|210.92999500000002|211.59999700000003| 205.869999| 205.93|148516900|26.680197999999997| |2010-01-19 00:00:00| 208.330002|215.18999900000003| 207.240004| 215.039995|182501900|27.860484999999997| |2010-01-20 00:00:00| 214.910006| 215.549994| 209.500002| 211.73|153038200| 27.431644| |2010-01-21 00:00:00| 212.079994|213.30999599999998| 207.210003| 208.069996|152038600| 26.957455| |2010-01-22 00:00:00|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-25 00:00:00|202.51000200000001| 204.699999| 200.190002| 203.070002|266424900|26.309658000000002| |2010-01-26 00:00:00|205.95000100000001| 213.710005| 202.580004| 205.940001|466777500| 26.681494| |2010-01-27 00:00:00| 206.849995| 210.58| 199.530001| 207.880005|430642100|26.932840000000002| |2010-01-28 00:00:00| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208| |2010-02-01 00:00:00|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ only showing top 20 rows
df.filter("Close<500").select('Open').show()
+------------------+ | Open| +------------------+ | 213.429998| | 214.599998| | 214.379993| | 211.75| | 210.299994| |212.79999700000002| |209.18999499999998| | 207.870005| |210.11000299999998| |210.92999500000002| | 208.330002| | 214.910006| | 212.079994| |206.78000600000001| |202.51000200000001| |205.95000100000001| | 206.849995| | 204.930004| | 201.079996| |192.36999699999998| +------------------+ only showing top 20 rows
df.filter("Close<500").select(['Open','Close']).show()
+------------------+------------------+ | Open| Close| +------------------+------------------+ | 213.429998| 214.009998| | 214.599998| 214.379993| | 214.379993| 210.969995| | 211.75| 210.58| | 210.299994|211.98000499999998| |212.79999700000002|210.11000299999998| |209.18999499999998| 207.720001| | 207.870005| 210.650002| |210.11000299999998| 209.43| |210.92999500000002| 205.93| | 208.330002| 215.039995| | 214.910006| 211.73| | 212.079994| 208.069996| |206.78000600000001| 197.75| |202.51000200000001| 203.070002| |205.95000100000001| 205.940001| | 206.849995| 207.880005| | 204.930004| 199.289995| | 201.079996| 192.060003| |192.36999699999998| 194.729998| +------------------+------------------+ only showing top 20 rows
df.filter(df["Close"] < 200).show()
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ |2010-01-22 00:00:00|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-28 00:00:00| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208| |2010-02-01 00:00:00|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131| |2010-02-02 00:00:00| 195.909998| 196.319994|193.37999299999998| 195.859997|174585600|25.375532999999997| |2010-02-03 00:00:00| 195.169994| 200.200003| 194.420004| 199.229994|153832000|25.812148999999998| |2010-02-04 00:00:00| 196.730003| 198.370001| 191.570005| 192.050003|189413000| 24.881912| |2010-02-05 00:00:00|192.63000300000002| 196.0| 190.850002| 195.460001|212576700|25.323710000000002| |2010-02-08 00:00:00| 195.690006|197.88000300000002| 193.999994|194.11999699999998|119567700| 25.1501| |2010-02-09 00:00:00| 196.419996| 197.499994| 194.749998|196.19000400000002|158221700| 25.418289| |2010-02-10 00:00:00| 195.889997| 196.6| 194.26|195.12000700000002| 92590400| 25.27966| |2010-02-11 00:00:00| 194.880001| 199.750006|194.05999599999998| 198.669994|137586400| 25.739595| |2010-02-23 00:00:00| 199.999998| 201.330002| 195.709993| 197.059998|143773700| 25.531005| |2014-06-09 00:00:00| 92.699997| 93.879997| 91.75| 93.699997| 75415000| 88.906324| |2014-06-10 00:00:00| 94.730003| 95.050003| 93.57| 94.25| 62777000| 89.428189| |2014-06-11 00:00:00| 94.129997| 94.760002| 93.470001| 93.860001| 45681000| 89.058142| |2014-06-12 00:00:00| 94.040001| 94.120003| 91.900002| 92.290001| 54749000| 87.568463| |2014-06-13 00:00:00| 92.199997| 92.440002| 90.879997| 91.279999| 54525000| 86.610132| |2014-06-16 00:00:00| 91.510002| 92.75| 91.449997| 92.199997| 35561000| 87.483064| |2014-06-17 00:00:00| 92.309998| 92.699997| 91.800003| 92.08000200000001| 29726000| 87.36920699999999| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ only showing top 20 rows
df.filter( (df["Close"] < 200) & (df['Open'] > 200) ).show()
+-------------------+------------------+----------+----------+----------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+----------+----------+----------+---------+------------------+ |2010-01-22 00:00:00|206.78000600000001|207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-28 00:00:00| 204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996|202.199995|190.250002|192.060003|311488100| 24.883208| +-------------------+------------------+----------+----------+----------+---------+------------------+
df.filter( (df["Close"] < 200) | (df['Open'] > 200) ).show()
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ |2010-01-04 00:00:00| 213.429998| 214.499996|212.38000099999996| 214.009998|123432400| 27.727039| |2010-01-05 00:00:00| 214.599998| 215.589994| 213.249994| 214.379993|150476200|27.774976000000002| |2010-01-06 00:00:00| 214.379993| 215.23| 210.750004| 210.969995|138040000|27.333178000000004| |2010-01-07 00:00:00| 211.75| 212.000006| 209.050005| 210.58|119282800| 27.28265| |2010-01-08 00:00:00| 210.299994| 212.000006|209.06000500000002|211.98000499999998|111902700| 27.464034| |2010-01-11 00:00:00|212.79999700000002| 213.000002| 208.450005|210.11000299999998|115557400| 27.221758| |2010-01-12 00:00:00|209.18999499999998|209.76999500000002| 206.419998| 207.720001|148614900| 26.91211| |2010-01-13 00:00:00| 207.870005|210.92999500000002| 204.099998| 210.650002|151473000| 27.29172| |2010-01-14 00:00:00|210.11000299999998|210.45999700000002| 209.020004| 209.43|108223500| 27.133657| |2010-01-15 00:00:00|210.92999500000002|211.59999700000003| 205.869999| 205.93|148516900|26.680197999999997| |2010-01-19 00:00:00| 208.330002|215.18999900000003| 207.240004| 215.039995|182501900|27.860484999999997| |2010-01-20 00:00:00| 214.910006| 215.549994| 209.500002| 211.73|153038200| 27.431644| |2010-01-21 00:00:00| 212.079994|213.30999599999998| 207.210003| 208.069996|152038600| 26.957455| |2010-01-22 00:00:00|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-25 00:00:00|202.51000200000001| 204.699999| 200.190002| 203.070002|266424900|26.309658000000002| |2010-01-26 00:00:00|205.95000100000001| 213.710005| 202.580004| 205.940001|466777500| 26.681494| |2010-01-27 00:00:00| 206.849995| 210.58| 199.530001| 207.880005|430642100|26.932840000000002| |2010-01-28 00:00:00| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208| |2010-02-01 00:00:00|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+ only showing top 20 rows
df.filter( (df["Close"] < 200) & ~(df['Open'] < 200) ).show()
+-------------------+------------------+----------+----------+----------+---------+------------------+ | Date| Open| High| Low| Close| Volume| Adj Close| +-------------------+------------------+----------+----------+----------+---------+------------------+ |2010-01-22 00:00:00|206.78000600000001|207.499996| 197.16| 197.75|220441900| 25.620401| |2010-01-28 00:00:00| 204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002| |2010-01-29 00:00:00| 201.079996|202.199995|190.250002|192.060003|311488100| 24.883208| +-------------------+------------------+----------+----------+----------+---------+------------------+
df.filter(df["Low"] == 197.16).show()
+-------------------+------------------+----------+------+------+---------+---------+ | Date| Open| High| Low| Close| Volume|Adj Close| +-------------------+------------------+----------+------+------+---------+---------+ |2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401| +-------------------+------------------+----------+------+------+---------+---------+
df.filter(df["Low"] == 197.16).collect()
Out[14]: [Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]
result = df.filter(df["Low"] == 197.16).collect()
type(result[0])
Out[16]: pyspark.sql.types.Row
row = result[0]
row.asDict()
Out[19]: {'Date': datetime.datetime(2010, 1, 22, 0, 0), 'Open': 206.78000600000001, 'High': 207.499996, 'Low': 197.16, 'Close': 197.75, 'Volume': 220441900, 'Adj Close': 25.620401}
for item in result[0]:
print(item)
2010-01-22 00:00:00 206.78000600000001 207.499996 197.16 197.75 220441900 25.620401
Dates and Timestamps
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format
df.select(hour(df['Date'])).show()
+----------+ |hour(Date)| +----------+ | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| | 0| +----------+ only showing top 20 rows
df.select(dayofyear(df['Date'])).show()
+---------------+ |dayofyear(Date)| +---------------+ | 4| | 5| | 6| | 7| | 8| | 11| | 12| | 13| | 14| | 15| | 19| | 20| | 21| | 22| | 25| | 26| | 27| | 28| | 29| | 32| +---------------+ only showing top 20 rows
df.select(month(df['Date'])).show()
+-----------+ |month(Date)| +-----------+ | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 1| | 2| +-----------+ only showing top 20 rows
df.select(year(df['Date'])).show()
+----------+ |year(Date)| +----------+ | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| | 2010| +----------+ only showing top 20 rows
df.withColumn("Year",year(df['Date'])).show()
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+ | Date| Open| High| Low| Close| Volume| Adj Close|Year| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+ |2010-01-04 00:00:00| 213.429998| 214.499996|212.38000099999996| 214.009998|123432400| 27.727039|2010| |2010-01-05 00:00:00| 214.599998| 215.589994| 213.249994| 214.379993|150476200|27.774976000000002|2010| |2010-01-06 00:00:00| 214.379993| 215.23| 210.750004| 210.969995|138040000|27.333178000000004|2010| |2010-01-07 00:00:00| 211.75| 212.000006| 209.050005| 210.58|119282800| 27.28265|2010| |2010-01-08 00:00:00| 210.299994| 212.000006|209.06000500000002|211.98000499999998|111902700| 27.464034|2010| |2010-01-11 00:00:00|212.79999700000002| 213.000002| 208.450005|210.11000299999998|115557400| 27.221758|2010| |2010-01-12 00:00:00|209.18999499999998|209.76999500000002| 206.419998| 207.720001|148614900| 26.91211|2010| |2010-01-13 00:00:00| 207.870005|210.92999500000002| 204.099998| 210.650002|151473000| 27.29172|2010| |2010-01-14 00:00:00|210.11000299999998|210.45999700000002| 209.020004| 209.43|108223500| 27.133657|2010| |2010-01-15 00:00:00|210.92999500000002|211.59999700000003| 205.869999| 205.93|148516900|26.680197999999997|2010| |2010-01-19 00:00:00| 208.330002|215.18999900000003| 207.240004| 215.039995|182501900|27.860484999999997|2010| |2010-01-20 00:00:00| 214.910006| 215.549994| 209.500002| 211.73|153038200| 27.431644|2010| |2010-01-21 00:00:00| 212.079994|213.30999599999998| 207.210003| 208.069996|152038600| 26.957455|2010| |2010-01-22 00:00:00|206.78000600000001| 207.499996| 197.16| 197.75|220441900| 25.620401|2010| |2010-01-25 00:00:00|202.51000200000001| 204.699999| 200.190002| 203.070002|266424900|26.309658000000002|2010| |2010-01-26 00:00:00|205.95000100000001| 213.710005| 202.580004| 205.940001|466777500| 26.681494|2010| |2010-01-27 00:00:00| 206.849995| 210.58| 199.530001| 207.880005|430642100|26.932840000000002|2010| |2010-01-28 00:00:00| 204.930004| 205.500004| 198.699995| 199.289995|293375600|25.819922000000002|2010| |2010-01-29 00:00:00| 201.079996| 202.199995| 190.250002| 192.060003|311488100| 24.883208|2010| |2010-02-01 00:00:00|192.36999699999998| 196.0|191.29999899999999| 194.729998|187469100| 25.229131|2010| +-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+ only showing top 20 rows
newdf = df.withColumn("Year",year(df['Date']))
newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']].show()
+---------+------------------+ |avg(Year)| avg(Close)| +---------+------------------+ | 2015.0|120.03999980555547| | 2013.0| 472.6348802857143| | 2014.0| 295.4023416507935| | 2012.0| 576.0497195640002| | 2016.0|104.60400786904763| | 2010.0| 259.8424600000002| | 2011.0|364.00432532142867| +---------+------------------+
result = newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']]
result = result.withColumnRenamed("avg(Year)","Year")
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close")).show()
+------+----------+ | Year|Mean Close| +------+----------+ |2015.0| 120.04| |2013.0| 472.63| |2014.0| 295.40| |2012.0| 576.05| |2016.0| 104.60| |2010.0| 259.84| |2011.0| 364.00| +------+----------+