Kernel: desc-pyspark
Author: LAL team
Last Verified to Run: 2018-12-20
The goal of this notebook is to have a first 3D look at the latest DC2 extragalatic catalog (v1.1.4) using Apache Spark. The use of Jupyter notebook at NERSC being (currently) memory limited, is not necessarily the best suited to do that, but may be used for local data visualization. It is also an opportunity to help people learning Apache Spark while both enjoying the beauty of 3D displays.
Finally it can be seen as the making-of the future awards-winning COSMO-DC2 movie!
Logistics:
This notebook is intended to be run through the JupyterHub NERSC interface with the desc-pyspark kernel. The kernel is automatically installed in your environment when you use the kernel setup script:
source /global/common/software/lsst/common/miniconda/kernels/setup.sh
Spark is an industry-standards solution to treat a large amount of data on several nodes, ie in parallel without ever writing a single line of MPI! And this is *simple*.
Even better it allows to analyze your data interactively (in cache) as if you had a huge RAM available just for you.
You don't have to care about complicate optimisation, it is automatically performed for you (and probably better than what you would do with sweat and tears)
And yes it is available as a set of python functions:and you can even install it on your laptop to play with. Even more: you can developp on your laptop and your results will (most certainly) scale when you willl apply them at NERSC onto the full dataset
By large we mean really Large (typically at the Tebayte scale). But before we get to that amount of data with LSST why not start learning it?
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
Now time to start: let import some base packages
from time import time
# For plot
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
# Initialise our Spark session
spark = SparkSession.builder.getOrCreate()
print("spark session started")
#usefull class to benchmark
class Timer:
""" A simple class for printing elapsed time (s) since last call
"""
def __init__(self):
self.t0 = time()
def start(self):
self.t0 = time()
def split(self):
t1 = time()
print("{:2.1f}s".format(t1 - self.t0))
timer = Timer()
read a file from a format called "parquet" (produced by the Data Access Task Force)
timer.start()
# Path to cosmoDC2 (parquet format)
filename = "/global/cscratch1/sd/plaszczy/CosmoDC2/xyz_v1.1.4.parquet"
# Spark DataFrame
df_all = spark.read.parquet(filename)
# Le's inspect the schema of the data
df_all.printSchema()
timer.split()
Fast isn't it? but be carefull there is a trick there...
Spark uses lazy evaluation when reading data so that (at this level) nothing is actually loaded in memory: only headers are read. Real loading appears later, for instance when data are put in cache and an action is triggered. For more see e.g. https://arxiv.org/abs/1807.03078
Note: Apache Spark has no efficient PySpark connector to read data in hdf5 file. Therefore we first converted the cosmoDC2 data set into parquet (similar to what DPDD tools offer). For the purpose of this notebook, we only convert a few columns of interest. The file is accessible at NERSC for DESC members.
for a nicer look we reject faint galaxies that do have negative halo_id
and keep only spatial positions. Then we load and cache in memory some relavant quantities. Finally we count how many galaxies remain. Note that only the final action (count
) triggers the whole computation. The order of transformations (select
, filter
, ...) does not matter as Spark is produces an optimized version of the whole chain prior to perform the action.
timer.start()
# Create a new dataframe selecting only interesting columns
# and keep only entries with positive halo_id.
cols = ["halo_id", "position_x", "position_y", "position_z", "redshift"]
cond = "halo_id > 0"
df = df_all.select(cols).filter(cond)
# We do not need halo_id anymore, so le's drop it
df = df.drop("halo_id")
# Put the data in cache for speeding up later use
# and trigger an action (count).
# This is the longer step but must be executed only once!
N = df.cache().count()
print("Number of galaxies={} M".format(N / 1e6))
timer.split()
DataFrames have built-in tools to quickly inspect the data distribution. See the ones available in https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions
here we just plot a summary statistics
timer.start()
# describe() returns a DataFrame
stat = df.describe()
stat.show()
timer.split()
Let's look at the distribution of redshifts. This is slightly more technical (for details see: https://arxiv.org/abs/1807.03078 ), but it is extremely efficient for large dataset and will directly scale when (much) more data will be available, outperforming traditional tools. The idea to do this efficiently (and in parallel) is to
matplotlib.rcParams.update({'font.size': 17})
timer.start()
# Get min/max redshift values
m = df.select(F.min("redshift"), F.max("redshift")).first()
zmin, zmax = m[0], m[1]
# Number of bins, and spacings for our histogram
Nbins = 50
dz = (zmax - zmin) / Nbins
step = (zmax - zmin) / Nbins
# Add a columns of bins values
zbin = df.select(
df["redshift"],
((df["redshift"] - zmin - dz / 2) / dz).cast(IntegerType()).alias('bin'))
# GroupBy this column + count how many in each group (= histogram)
h = zbin.groupBy("bin").count().orderBy(F.asc("bin"))
# Add column of bin center locations + drop bin number
h = h.select("bin",(zmin + dz / 2 + h['bin'] * dz).alias('loc'), "count").drop("bin")
# Data is now reduced to a histogram so we can
# safely transfer back the data to the driver, in the pandas world
hp = h.toPandas()
plt.figure(figsize=(10, 7))
plt.bar(hp['loc'].values, hp['count'].values, step, color='w', edgecolor='k')
plt.xlabel("redshift", fontsize=20)
timer.split()
Altoough you do not see it explicitely this is really distributed computing. You send the algorithm to the data (over each node) and recover the reduced values (histograms).
You are more used working the other way: you recover data in memory and run your algorithm
This can be emulated with the following piece of code where the "collect" function fills the driver memory and one then calls the classical hist function :
timer.start()
z_values = [_[0] for _ in df.select('redshift').collect()]
plt.hist(z_values, range=(min(z_values), max(z_values)), bins=Nbins)
plt.xlabel('redshift')
timer.split()
As any comparison it is very unfair, since the driver here uses one thread, but it is also unfair to use Spark on such a low amount of data (42 millions)! But you got the idea of distributed (painless) computing that will prove its superiority for > 100GB data.
Since jupyter notebooks are not well suited to both intensive computation (CPU!) + intensive visualization (GPU!), we illustrate how it can be used from matplotlib but there are definitely more powerfull tools to do so (that we will discuss next).
In the following we make a 30 Mpc zoom centers on the mean position in the dataset, where we begin to see the cosmic web.
matplotlib.rcParams.update({'font.size': 17})
# Size of the box to display
Mpc = 30.
# Get the mean positions from the previous stat DataFrame
means = stat.collect()[1]
m_x = float(means['position_x'])
m_y = float(means['position_y'])
m_z = float(means['position_z'])
dfcut = df.filter(
(F.abs(df.position_x - m_x) < Mpc) &
(F.abs(df.position_y - m_y) < Mpc) &
(F.abs(df.position_z - m_z) < Mpc))
# Visualise it!
p = dfcut.toPandas()
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(p.position_x, p.position_y, p.position_z, s=1)
ax.set_title('A long time ago in a galaxy far, \n far away... (z $\sim$ {:.2f})'.format(p.redshift.mean()))
ax.set_xlabel(r'x [Mpc/h]', labelpad=20)
ax.set_ylabel(r'y [Mpc/h]', labelpad=20)
ax.set_zlabel(r'z [Mpc/h]', labelpad=20)
plt.show()
By copying the data to the LAL cluster, using more sophisticated tools for vizualization and projecting data on our wall-of-screens we obtain the following MOVIE. Note we also colored data according to their redshift (with a colormap that is held as secret as coca-cola receipe)
from IPython.display import HTML
# Display video from Youtube
HTML('<iframe width="700" height="600" src="https://www.youtube.com/embed/HC_CQA5cOag" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>')