This notebook runs a simple trend analysis on the timeseries shown in the time series viewer: https://proba-v-mep.esa.int/applications/time-series-viewer/app/app.html It demonstrates how Spark can be used on the MEP Hadoop cluster, from within a notebook.
This notebook is not scientifically validated, only meant as a technical demonstration on a real world use case.
So some context first about the data we are going to manipulate.
Our goal is pretty simple: we want to detect if there are any trends for each zone.
We will proceed as follows:
We will detail each step along the way.
If you run this script in the jupyterlab of terrascope, make sure to shut down all kernels before reconnecting and running this script, as the last cell will use a lot of memory.
import pyspark
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import datetime, time
%matplotlib inline
Let's read the parquet file containing all our data, as follows:
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 1024)
conf.set('spark.executor.memory', '4g')
The SparkContext 'sc' is our entry point to the Hadoop cluster. The sparkConf defines some parameters.
sc = pyspark.SparkContext(conf=conf)
sqlCtx = pyspark.SQLContext(sc)
Open a terminal, run kinit
and supply your password to request a kerboros ticket.
!hdfs dfs -ls /tapdata/tsviewer/combined-stats/stats.parquet
data = sqlCtx.read.parquet("hdfs:/tapdata/tsviewer/combined-stats/stats.parquet")
data
Next, we want to register this data in a temporary table so we can query it using Spark's SQLContext.
data.registerTempTable("data")
Now although we instructed Spark where the data was and to register it, it did not actually read anything. This is because Spark is lazy: it will postpone doing any task until it the last moment. Requesting data will force it to load it, as follows:
data.take(1)
Using Spark's SQLContext, we can query our data with SQL-like expressions. Here we are interested in the zone, landcover, date, FAPAR and area columns.
We will also filter out unwanted rows by specifying that we do not want rows for which we do not have a FAPAR value.
data = sqlCtx.sql("SELECT zone, landcover, date date, BIOPAR_FAPAR_V1 fapar, BIOPAR_FAPAR_V1 " +
"FROM data " +
"WHERE BIOPAR_FAPAR_V1 is not null").cache()
data.take(1)
As you can see, although null values have been filtered out, some FAPAR values still remain nan. We can filter them out using the filter method from Spark. For that, we can either use the DataFrame's method filter, or we can use the DataFrame as a regular RDD.
The difference is that the DataFrame contains Rows, for which we can refer to the columns with their names, whereas the RDD contains tuples, where we have to select columns based on their index. However, with a regular RDD, we can pass any method we want to filter, map, reduce, ..., whereas with a DataFrame, we have to use pre-defined functions.
def fapar_not_nan(row):
return row[3] is not None and not np.isnan(row[3])
withoutNaNs = data.rdd.filter(fapar_not_nan).cache()
withoutNaNs.take(1)
For grouping, we could use spark's groupBy method, or we could first map our data to a (Key, Value) pair and use the groupByKey method. We will use the latter method as we have more fined control over what goes into Value.
def by_zone(row):
return ((row[0]), (row[1], row[2], row[3], row[4]))
by_zone = withoutNaNs.map(by_zone).groupByKey().cache()
by_zone.take(1)
The result is a list of pairs (Zone, Values) where Values is a list containing all the values for the specific Zone we grouped by.
Spark will combine the different values into a ResultIterable object, not really suited for the analysis we will be doing. Instead, we are going to use Pandas and Numpy. So first thing we want to do is to convert our ResultIterable into an actual Pandas DataFrame.
def iterable_to_pd(row):
return (row[0], pd.DataFrame(list(row[1]), columns=["landcover", "date", "fapar", "area"]))
by_zone = by_zone.map(iterable_to_pd).cache()
by_zone.take(1)
Now that we are ready to write our Combine task, let's detail the algorithm we are going to write to process each zone:
def combine_fapar(zone):
series = zone[1]
series["date"] = pd.to_datetime(series["date"])
series = series.set_index("date")
# Group the dataset by landcover, and for each:
by_landcover = series.groupby("landcover")
# Resample the data monthly (instead of 10-daily) with padding in order to fill some void if there are any
by_landcover = by_landcover.resample("1m", how="mean").fillna(method='pad')
series = by_landcover.reset_index()
# Reweight the FAPAR values by the area the landcover takes over the total area
total_area = by_landcover["area"].mean()
series["fapar"] *= series["area"] / total_area
series = series.drop("area", axis=1)
# Regroup the dataset by date and take the (monthly) sum
series = series.groupby("date").sum()
# Resample the data yearly and remove the 1st and last year of data (to only include full years)
yearly_sum = pd.DataFrame(series["fapar"].resample("12m", "mean")[1:-1])
return (zone[0], series, yearly_sum)
combined = by_zone.map(combine_fapar).cache()
combined.take(1)
Finally, we can compute the trend using a linear regression (OLS).
def min_years(datas):
return len(datas[2]) >=5
def ols(datas):
yearly_series = datas[2]
x, y = yearly_series.index.astype(int) // (10**9 * 60 * 24 * 30), yearly_series["fapar"]
X = np.c_[np.ones(len(x)), x]
return (datas[0], datas[1], datas[2], np.linalg.pinv(X).dot(y))
trends = combined.filter(min_years).map(ols).cache()
Next we will plot the trends. We are interested in the highest and lowest trends in our data.
Let's define our plotting function and plot the trends by descending order.
#copy auxiliary data from hdfs
!hdfs dfs -copyToLocal /tapdata/GAUL2013/zonecodes.csv ./
zonecodes = pd.read_csv("./zonecodes.csv")
def plotTopTrends(trendsCollected):
for zone, series, yearly_series, trend in trendsCollected:
if not np.isnan(trend[0]) and not np.isnan(trend[1]):
codes = zonecodes[zonecodes['ADM1_CODE'] == zone][["ADM0_NAME", "ADM1_NAME"]]
codes["Trend"] = trend[1]
print codes
pdf = series.copy()
pdf["ols"] = pdf.index.astype(int) / (10**9 * 60 * 24 * 30) * trend[1] + trend[0]
pdf[["fapar", "ols"]].plot(figsize=(16,4))
#sns.regplot(x="unix_date", y="fapar", data=series, scatter=False)
plt.show()
plotTopTrends(trends.sortBy(lambda x: x[3][1], ascending=False).take(10))
plotTopTrends(trends.sortBy(lambda x: x[3][1], ascending=True).take(10))