This notebook shows the MEP quickstart sample, which also exists as a non-notebook version at: https://bitbucket.org/vitotap/python-spark-quickstart
It shows how to use Spark (http://spark.apache.org/) for distributed processing on the PROBA-V Mission Exploitation Platform. (https://proba-v-mep.esa.int/) The sample intentionally implements a very simple computation: for each PROBA-V tile in a given bounding box and time range, a histogram is computed. The results are then summed and printed. Computation of the histograms runs in parallel.
A catalog API is available to easily retrieve paths to PROBA-V files: https://readthedocs.org/projects/mep-catalogclient/
from catalogclient import catalog
cat=catalog.Catalog()
cat.get_producttypes()
['BioPar_ALB_BHV_V1_Tiles', 'BioPar_ALB_DHV_V1_Tiles', 'BioPar_ALBH_V1_Global', 'BioPar_BA_V1_Tiles', 'BioPar_DMP_Tiles', 'BioPar_DMP300_V1_Global', 'BioPar_FAPAR_V1_Tiles', 'BioPar_FAPAR_V1_Global', 'BioPar_FAPAR_V2_Global', 'BioPar_FCOVER_V1_Tiles', 'BioPar_FCOVER_V1_Global', 'BioPar_FCOVER_V2_Global', 'BioPar_LAI_V1_Tiles', 'BioPar_LAI_V1_Global', 'BioPar_LAI_V2_Global', 'BioPar_NDVI300_V1_Global', 'BioPar_NDVI300_V1_Global_GTIFF', 'BioPar_BA300_V1_Global', 'BioPar_FCOVER300_V1_Global', 'BioPar_FAPAR300_V1_Global', 'BioPar_LAI300_V1_Global', 'BioPar_NDVI_V1_Tiles', 'BioPar_NDVI_V2_Tiles', 'BioPar_NDVI_V2_Global', 'BioPar_SWI', 'BioPar_SWI10_V3_Global', 'BioPar_TOCR_Tiles', 'BioPar_VCI_Tiles', 'BioPar_VPI_Tiles', 'BioPar_WB_V1_Tiles', 'BioPar_WB_V2_Tiles', 'BioPar_WB_V2_Global', 'BioPar_WB300_V1_Global', 'PROBAV_L3_S1_TOC_1KM', 'PROBAV_L3_S1_TOC_333M', 'PROBAV_L3_S10_TOC_333M', 'PROBAV_L3_S5_TOC_100M', 'PROBAV_L3_S1_TOC_100M', 'PROBAV_L3_S10_TOC_1KM', 'PROBAV_L3_S1_TOA_1KM', 'PROBAV_L3_S1_TOA_333M', 'PROBAV_L3_S5_TOA_100M', 'PROBAV_L3_S1_TOA_100M', 'PROBAV_L1C', 'PROBAV_L2A_1KM', 'PROBAV_L2A_333M', 'PROBAV_L2A_100M', 'PROBAV_L3_S10_TOC_NDVI_1KM', 'PROBAV_L3_S10_TOC_NDVI_333M', 'PROBAV_L3_S1_TOC_NDVI_100M', 'PROBAV_L3_S5_TOC_NDVI_100M', 'PROBAV_L2A_1KM_ANTAR', 'PROBAV_L2A_333M_ANTAR', 'PROBAV_L2A_100M_ANTAR', 'CGS_S2_FAPAR', 'CGS_S2_FAPAR_10M', 'CGS_S2_FAPAR_20M', 'CGS_S2_NDVI', 'CGS_S2_NDVI_10M', 'CGS_S2_LAI', 'CGS_S2_LAI_10M', 'CGS_S2_LAI_20M', 'CGS_S2_FCOVER', 'CGS_S2_FCOVER_10M', 'CGS_S2_FCOVER_20M', 'CGS_S2_CCC_10M', 'CGS_S2_CCC_20M', 'CGS_S2_CWC_10M', 'CGS_S2_CWC_20M', 'CGS_S2_RADIOMETRY', 'CGS_S2_RADIOMETRY_10M', 'CGS_S2_RADIOMETRY_20M', 'CGS_S2_RADIOMETRY_60M', 'CGS_S1_GRD_SIGMA0_L1', 'CGS_S1_SLC_L1', 'CGS_S1_GRD_L1', 'NEXTGEOSS_SENTINEL2_FAPAR', 'NEXTGEOSS_SENTINEL2_NDVI', 'NEXTGEOSS_SENTINEL2_LAI', 'NEXTGEOSS_SENTINEL2_FCOVER', 'NEXTGEOSS_SENTINEL2_RADIOMETRY', 'FSTEP_SENTINEL2_FAPAR', 'FSTEP_SENTINEL2_NDVI', 'FSTEP_SENTINEL2_LAI', 'FSTEP_SENTINEL2_FCOVER', 'FSTEP_SENTINEL2_RADIOMETRY', 'SPOTVEGETATION_L3_S1', 'SPOTVEGETATION_L3_S10']
date = "2016-01-01"
products = cat.get_products('PROBAV_L3_S1_TOC_333M',
fileformat='GEOTIFF',
startdate=date,
enddate=date,
min_lon=0, max_lon=10, min_lat=36, max_lat=53)
#extract NDVI geotiff files from product metadata
files = [p.file('NDVI')[5:] for p in products]
print('Found '+str(len(files)) + ' files.')
print(files[0])
#check if file exists
!file {files[0]}
Found 2 files. /data/MTDA/TIFFDERIVED/PROBAV_L3_S1_TOC_333M/2016/20160101/PROBAV_S1_TOC_20160101_333M_V101/PROBAV_S1_TOC_X18Y02_20160101_333M_V101_NDVI.tif /data/MTDA/TIFFDERIVED/PROBAV_L3_S1_TOC_333M/2016/20160101/PROBAV_S1_TOC_20160101_333M_V101/PROBAV_S1_TOC_X18Y02_20160101_333M_V101_NDVI.tif: TIFF image data, little-endian
Define the histogram function, this can also be done inline, which allows for a faster feedback loop when writing the code, but here we want to clearly separate the processing 'algorithm' from the parallelization code.
# Calculates the histogram for a given (single band) image file.
def histogram(image_file):
import numpy as np
import gdal
# Open image file
img = gdal.Open(image_file)
if img is None:
print( '-ERROR- Unable to open image file "%s"' % image_file )
# Open raster band (first band)
raster = img.GetRasterBand(1)
xSize = img.RasterXSize
ySize = img.RasterYSize
# Read raster data
data = raster.ReadAsArray(0, 0, xSize, ySize)
# Calculate histogram
hist, _ = np.histogram(data, bins=256)
return hist
To work on the processing cluster, we need to specify the resources we want:
We set up the SparkConf with these parameters, and create a SparkContext sc, which will be our access point to the cluster.
%%time
# ================================================================
# === Calculate the histogram for a given number of files. The ===
# === processing is performed by spreading them over a cluster ===
# === of Spark nodes. ===
# ================================================================
from datetime import datetime
from operator import add
import pyspark
import os
# Setup the Spark cluster
conf = pyspark.SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 512)
conf.set('spark.executor.memory', '512m')
sc = pyspark.SparkContext(conf=conf)
CPU times: user 225 ms, sys: 15.4 ms, total: 240 ms Wall time: 15.4 s
We use a couple of Spark functions to run our job on the cluster. Comments are provided in the code.
%%time
# Distribute the local file list over the cluster.
filesRDD = sc.parallelize(files,len(files))
# Apply the 'histogram' function to each filename using 'map', keep the result in memory using 'cache'.
hists = filesRDD.map(histogram).cache()
count = hists.count()
# Combine distributed histograms into a single result
total = list(hists.reduce(lambda h, i: map(add, h, i)))
hists.unpersist()
print( "Sum of %i histograms: %s" % (count, total))
#stop spark session if we no longer need it
sc.stop()
Plot the array of values as a simple line chart using matplotlib. This is the most basic Python library. More advanced options such as bokeh, mpld3 and seaborn are also available.
%matplotlib inline
import matplotlib.pyplot as plt
plt.plot(total)
plt.show()