This notebook shows the MEP quickstart sample, which also exists as a non-notebook version at:

It shows how to use Spark ( for distributed processing on the PROBA-V Mission Exploitation Platform. ( The sample intentionally implements a very simple computation: for each PROBA-V tile in a given bounding box and time range, a histogram is computed. The results are then summed and printed. Computation of the histograms runs in parallel.

First step: get file paths

A catalog API is available to easily retrieve paths to PROBA-V files:

In [1]:
from catalogclient import catalog
In [2]:
date = "2016-01-01"
products = cat.get_products('PROBAV_L3_S1_TOC_333M', 
                            min_lon=0, max_lon=10, min_lat=36, max_lat=53)
#extract NDVI geotiff files from product metadata
files = [p.file('NDVI')[5:] for p in products]
print('Found '+str(len(files)) + ' files.')
#check if file exists
!file {files[0]}
Found 2 files.
/data/MTDA/TIFFDERIVED/PROBAV_L3_S1_TOC_333M/2016/20160101/PROBAV_S1_TOC_20160101_333M_V101/PROBAV_S1_TOC_X18Y02_20160101_333M_V101_NDVI.tif: TIFF image data, little-endian

Second step: define function to apply

Define the histogram function, this can also be done inline, which allows for a faster feedback loop when writing the code, but here we want to clearly separate the processing 'algorithm' from the parallelization code.

In [3]:
# Calculates the histogram for a given (single band) image file.
def histogram(image_file):
    import numpy as np
    import gdal
    # Open image file
    img = gdal.Open(image_file)
    if img is None:
        print( '-ERROR- Unable to open image file "%s"' % image_file )
    # Open raster band (first band)
    raster = img.GetRasterBand(1)    
    xSize = img.RasterXSize
    ySize = img.RasterYSize
    # Read raster data
    data = raster.ReadAsArray(0, 0, xSize, ySize)
    # Calculate histogram
    hist, _ = np.histogram(data, bins=256)
    return hist

Third step: setup Spark

To work on the processing cluster, we need to specify the resources we want:

  • spark.executor.cores: Number of cores per executor. Usually our tasks are single threaded, so 1 is a good default.
  • spark.executor.memory: memory to assign per executor. For the Java/Spark processing, not the Python part.
  • spark.yarn.executor.memoryOverhead: memory available for Python in each executor.

We set up the SparkConf with these parameters, and create a SparkContext sc, which will be our access point to the cluster.

In [4]:
# ================================================================
# === Calculate the histogram for a given number of files. The ===
# === processing is performed by spreading them over a cluster ===
# === of Spark nodes.                                          ===
# ================================================================

from datetime import datetime
from operator import add
import pyspark
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.5'
# Setup the Spark cluster
conf = pyspark.SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 512)
conf.set('spark.executor.memory', '512m')

sc = pyspark.SparkContext(conf=conf)
CPU times: user 225 ms, sys: 15.4 ms, total: 240 ms
Wall time: 15.4 s

Fourth step: compute histograms

We use a couple of Spark functions to run our job on the cluster. Comments are provided in the code.

In [ ]:
# Distribute the local file list over the cluster.
filesRDD = sc.parallelize(files,len(files))

# Apply the 'histogram' function to each filename using 'map', keep the result in memory using 'cache'.
hists =

count = hists.count()

# Combine distributed histograms into a single result
total = list(hists.reduce(lambda h, i: map(add, h, i)))

print( "Sum of %i histograms: %s" % (count, total))
In [ ]:
#stop spark session if we no longer need it

Fifth step: plot our result

Plot the array of values as a simple line chart using matplotlib. This is the most basic Python library. More advanced options such as bokeh, mpld3 and seaborn are also available.

In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt