# Map-Reduce and Apache Spark¶

(C) 2015 Steve Phelps

## Overview¶

1. Recap of functional programming in Python
2. Python's map and reduce functions
3. Writing parallel code using map
4. The Map-Reduce programming model
5. Using Apache Spark with Python

• Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.

• Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.

• Spark Programming Guide

• Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.

## History¶

• The Map-Reduce programming model was popularised by Google (Dean and Ghemawat 2008).

• The first popular open-source implementation was Apache Hadoop, first released in 2011.

• Apache Spark was first released in 2014.

• It was originally developed by Matei Zaharia as a class project, and later a PhD dissertation, at University of California, Berkeley.

• In contrast to Hadoop, Apache Spark:

• is easy to install and configure.
• provides a much more natural iterative workflow

## Resilient distributed datasets¶

• The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).

• When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce new RDDs.

## Functional programming¶

Consider the following code:

In :
def double_everything_in(data):
result = []
for i in data:
result.append(2 * i)
return result

result = []
for i in data:
result.append(4 * i)
return result

In :
double_everything_in([1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
In :
quadruple_everything_in([1, 2, 3, 4, 5])

Out:
[4, 8, 12, 16, 20]
• The above code violates the "do not repeat yourself" principle of good software engineering practice.

• How can rewrite the code so that it avoids duplication?

In :
def multiply_by_x_everything_in(x, data):
result = []
for i in data:
result.append(x * i)
return result

In :
multiply_by_x_everything_in(2, [1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
In :
multiply_by_x_everything_in(4, [1, 2, 3, 4, 5])

Out:
[4, 8, 12, 16, 20]
• Now consider the following code:
In :
def squared(x):
return x*x

def double(x):
return x*2

def square_everything_in(data):
result = []
for i in data:
result.append(squared(i))
return result

def double_everything_in(data):
result = []
for i in data:
result.append(double(i))
return result

In :
square_everything_in([1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]
In :
double_everything_in([1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
• The above code violates the "do not repeat yourself" principle of good software engineering practice.

• How can rewrite the code so that it avoids duplication?

## Using functions as values¶

In :
def apply_f_to_everything_in(f, data):
result = []
for x in data:
result.append(f(x))
return result

In :
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]
In :
apply_f_to_everything_in(double, [1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]

### Lambda expressions¶

• We can use anonymous functions to save having to define a function each time we want to use map.
In :
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]

# Python's map function¶

• Python has a built-in function map which is much faster than our version.
In :
list(map(lambda x: x*x, [1, 2, 3, 4, 5]))

Out:
[1, 4, 9, 16, 25]

## Implementing reduce¶

• The reduce function is an example of a fold.

• There are different ways we can fold data.

• The following implements a left fold.

In :
def foldl(f, data, z):
if (len(data) == 0):
print(z)
return z
else:
tail = data[1:]
print("Folding", head, "with", tail, "using", z)
partial_result = f(z, data)
print("Partial result is", partial_result)
return foldl(f, tail, partial_result)

In :
def add(x, y):
return x + y

foldl(add, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with  using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15

Out:
15
In :
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with  using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15

Out:
15
In :
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15
In :
(((((0 - 1) - 2) - 3) - 4) - 5)

Out:
-15
In :
(1 - (2 - (3 - (4 - (5 - 0)))))

Out:
3
In :
def foldr(f, data, z):
if (len(data) == 0):
return z
else:
return f(data, foldr(f, data[1:], z))

In :
foldl(lambda x, y: x - y,  [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15
In :
foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Out:
3

## Python's reduce function.¶

• Python's built-in reduce function is a left fold.
In :
from functools import reduce

reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])

Out:
15
In :
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Out:
-15
In :
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15

# Functional programming and parallelism¶

• Functional programming lends itself to parallel programming.

• The map function can easily be parallelised through data-level parallelism,

• provided that the function we supply as an argument is free from side-effects
• (which is why we avoid working with mutable data).
• We can see this by rewriting it so:

In :
def perform_computation(f, result, data, i):
print("Computing the ", i, "th result...")
# This could be scheduled on a different CPU
result[i] = f(data[i])

def my_map(f, data):
result = [None] * len(data)
for i in range(len(data)):
perform_computation(f, result, data, i)
# Wait for other CPUs to finish, and then..
return result

In :
my_map(lambda x: x * x, [1, 2, 3, 4, 5])

Computing the  0 th result...
Computing the  1 th result...
Computing the  2 th result...
Computing the  3 th result...
Computing the  4 th result...

Out:
[1, 4, 9, 16, 25]

## A multi-threaded map function¶

In :
from threading import Thread

# Each function evaluation is scheduled on a different core.
def my_job():
print("Processing data:", data[i], "... ")
result[i] = f(data[i])
print("Finished job #", i)
print("Result was", result[i])

n = len(data)
result = [None] * n
print("Scheduling jobs.. ")
for i in range(n):
print("Starting jobs.. ")
for i in range(n):
print("Waiting for jobs to finish.. ")
for i in range(n):
print("All done.")
return result

In :
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])

Scheduling jobs..
Starting jobs..
Processing data: 1 ...
Finished job # 0
Result was 1
Processing data: 2 ...
Finished job # 1
Result was 4
Processing data: 3 ...
Finished job # 2
Result was 9
Processing data:Processing data: 5 ...
Finished job # 4
Result was 25
Waiting for jobs to finish.. 4 ...
Finished job # 3
Result was 16

All done.

Out:
[1, 4, 9, 16, 25]
In :
from numpy.random import uniform
from time import sleep

def a_function_which_takes_a_long_time(x):
sleep(uniform(2, 10))  # Simulate some long computation
return x*x

my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])

Scheduling jobs..
Starting jobs..
Processing data:Processing data: 2 ...
1 ...
Processing data: 3 ...
Processing data: 4 ...
Processing data:Waiting for jobs to finish..
5 ...
Finished job # 2
Result was 9
Finished job # 4
Result was 25
Finished job # 0
Result was 1
Finished job # 3
Result was 16
Finished job # 1
Result was 4
All done.

Out:
[1, 4, 9, 16, 25]

## Map Reduce¶

• Map Reduce is a programming model for scalable parallel processing.
• Scalable here means that it can work on big data with very large compute clusters.
• There are many implementations: e.g. Apache Hadoop and Apache Spark.
• We can use Map-Reduce with any programming language:
• Hadoop is written in Java
• Spark is written in Scala, but has a Python interface.
• Functional programming languages such as Python or Scala fit very well with the Map Reduce model:
• However, we don't have to use functional programming.
• A MapReduce implementation will take care of the low-level functionality so that you don't have to worry about:
• network I/O
• network and disk transfer optimisation
• handling of machine failures
• serialization of data
• etc..
• The model is designed to move the processing to where the data resides.

## Typical steps in a Map Reduce Computation¶

1. ETL a big data set.
2. Map operation: extract something you care about from each row
3. "Shuffle and Sort": task/node allocation
4. Reduce operation: aggregate, summarise, filter or transform
5. Write the results.

## Callbacks for Map Reduce¶

• The data set, and the state of each stage of the computation, is represented as a set of key-value pairs.

• The programmer provides a map function:

$\operatorname{map}(k, v) \rightarrow \; \left< k', v' \right>*$

• and a reduce function:

$\operatorname{reduce}(k', \left< k', v'\right> *) \rightarrow \; \left< k', v'' \right> *$

• The $*$ refers to a collection of values.

• These collections are not ordered.

## Resilient Distributed Data¶

• In a Map-Reduce computation these collections are resilient distributed data-sets (RDDs):

• The data is distributed across nodes in a cluster of computers.
• No data is lost if a single node fails.
• Data is typically stored in HBase tables, or HDFS files.
• The map and reduce functions can work in parallel across different keys, or different elements of the collection.
• The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer.

## Word Count Example¶

• In this simple example, the input is a set of URLs, each record is a document.

• Problem: compute how many times each word has occurred across data set.

## Word Count: Map¶

• The input to $\operatorname{map}$ is a mapping:
• Key: URL
• Value: Contents of document

$\left< document1, to \; be \; or \; not \; to \; be \right>$

• In this example, our $\operatorname{map}$ function will process a given URL, and produces a mapping:
• Key: word
• Value: 1
• So our original data-set will be transformed to:

$\left< to, 1 \right>$ $\left< be, 1 \right>$ $\left< or, 1 \right>$ $\left< not, 1 \right>$ $\left< to, 1 \right>$ $\left< be, 1 \right>$

## Word Count: Reduce¶

• The reduce operation groups values according to their key, and then performs areduce on each key.

• The collections are partitioned across different storage units, therefore.

• Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.

• Data in different partitions are reduced separately in parallel.

• The final result is a reduce of the reduced data in each partition.

• Therefore it is very important that our operator is both commutative and associative.

• In our case the function is the + operator

$\left< be, 2 \right>$
$\left< not, 1 \right>$
$\left< or, 1 \right>$
$\left< to, 2 \right>$

## Map and Reduce compared with Python¶

• Notice that these functions are formulated differently from the standard Python functions of the same name.

• The reduce function works with key-value pairs.

• It would be more apt to call it something like reduceByKey.

## MiniMapReduce¶

• To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python.

• This illustrates how a problem can be written in terms of map and reduce operations.

• Note that these are illustrative functions; this is not how Hadoop or Apache Spark actually implement them.

In :
##########################################################
#
#   MiniMapReduce
#
# A non-parallel, non-scalable Map-Reduce implementation
##########################################################

def groupByKey(data):
result = dict()
for key, value in data:
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result

def reduceByKey(f, data):
key_values = groupByKey(data)
return list(map(lambda key:
(key, reduce(f, key_values[key])),
key_values))


## Word-count using MiniMapReduce¶

In :
data = list(map(lambda x: (x, 1), "to be or not to be".split()))
data

Out:
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
In :
groupByKey(data)

Out:
{'to': [1, 1], 'be': [1, 1], 'or': , 'not': }
In :
reduceByKey(lambda x, y: x + y, data)

Out:
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

## Parallelising MiniMapReduce¶

• We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework by using the my_map_multithreaded function we defined earlier.

• This will allow us to perform map-reduce computations that exploit parallel processing using multiple cores on a single computer.

In :
def reduceByKey_multithreaded(f, data):
key_values = groupByKey(data)
lambda key: (key, reduce(f, key_values[key])), [key for key in key_values])

In :
reduceByKey_multithreaded(lambda x, y: x + y, data)

Scheduling jobs..
Starting jobs..
Processing data: to ...
Finished job # 0
Result was ('to', 2)
Processing data: be ...
Finished job # 1
Result was ('be', 2)
Processing data: or ...
Finished job # 2
Result was ('or', 1)
Processing data: not ...
Finished job # 3
Result was ('not', 1)
Waiting for jobs to finish..
All done.

Out:
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

## Parallelising the reduce step¶

• Provided that our operator is both associative and commutative we can also parallelise the reduce operation.

• We partition the data into approximately equal subsets.

• We then reduce each subset independently on a separate core.

• The results can be combined in a final reduce step.

### Partitioning the data¶

In :
def split_data(data, split_points):
partitions = []
n = 0
for i in split_points:
partitions.append(data[n:i])
n = i
partitions.append(data[n:])
return partitions

data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
partitioned_data = split_data(data, )
partitioned_data

Out:
[['a', 'b', 'c'], ['d', 'e', 'f', 'g']]

### Reducing across partitions in parallel¶

In :
from threading import Thread

def parallel_reduce(f, partitions):

n = len(partitions)
results = [None] * n

def job(i):
results[i] = reduce(f, partitions[i])

for i in range(n):

for i in range(n):

return reduce(f, results)

parallel_reduce(lambda x, y: x + y, partitioned_data)

Out:
'abcdefg'

## Map-Reduce on a cluster of computers¶

• The code we have written so far will not allow us to exploit parallelism from multiple computers in a cluster.

• Developing such a framework would be a very large software engineering project.

• There are existing frameworks we can use:

• In this lecture we will cover Apache Spark.

## Apache Spark¶

• Apache Spark provides an object-oriented library for processing data on the cluster.

• It provides objects which represent resilient distributed datasets (RDDs).

• RDDs behave a bit like Python collections (e.g. lists).

• However:

• the underlying data is distributed across the nodes in the cluster, and
• the collections are immutable.

## Apache Spark and Map-Reduce¶

• We process the data by using higher-order functions to map RDDs onto new RDDs.

• Each instance of an RDD has at least two methods corresponding to the Map-Reduce workflow:

• map
• reduceByKey
• These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections.

• There are also additional RDD methods in the Apache Spark API;

• Apache Spark is a super-set of Map-Reduce.

## Word-count in Apache Spark¶

In :
words = "to be or not to be".split()
words

Out:
['to', 'be', 'or', 'not', 'to', 'be']

### The SparkContext class¶

• When working with Apache Spark we invoke methods on an object which is an instance of the pyspark.context.SparkContext context.

• Typically, an instance of this object will be created automatically for you and assigned to the variable sc.

• The parallelize method in SparkContext can be used to turn any ordinary Python collection into an RDD;

• normally we would create an RDD from a large file or an HBase table.
In :
words_rdd = sc.parallelize(words)
words_rdd

Out:
ParallelCollectionRDD at parallelize at PythonRDD.scala:195

### Mapping an RDD¶

• Now when we invoke the map or reduceByKey methods on my_rdd we can set up a parallel processing computation across the cluster.
In :
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd

Out:
PythonRDD at RDD at PythonRDD.scala:53
• Notice that we do not have a result yet.

• The computation is not performed until we request the final result to be collected.

• We do this by invoking the collect() method:

In :
word_tuples_rdd.collect()

Out:
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

### Reducing an RDD¶

• However, we require additional processing:
In :
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd

Out:
PythonRDD at RDD at PythonRDD.scala:53
• Now we request the final result:
In :
word_counts = word_counts_rdd.collect()
word_counts

Out:
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

### Lazy evaluation¶

• It is only when we invoke collect() that the processing is performed on the cluster.

• Invoking collect() will cause both the map and reduceByKey operations to be performed.

• If the resulting collection is very large then this can be an expensive operation.

### The head of an RDD¶

• The take method is similar to collect, but only returns the first $n$ elements.

• This can be very useful for testing.

In :
word_counts_rdd.take(2)

Out:
[('to', 2), ('be', 2)]

### The complete word-count example¶

In :
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)
counts.collect()

Out:
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

• Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:

• Sorting: sortByKey, sortBy, takeOrdered
• Mapping: flatMap
• Filtering: filter
• Counting: count
• Set-theoretic: intersection, union
• Many others: see the Transformations section of the programming guide

## Creating an RDD from a text file¶

• In the previous example, we created an RDD from a Python collection.

• This is not typically how we would work with big data.

• More commonly we would create an RDD corresponding to data in an HBase table, or an HDFS file.

• The following example creates an RDD from a text file on the native filesystem (ext4);

• With bigger data, you would use an HDFS file, but the principle is the same.
• Each element of the RDD corresponds to a single line of text.

In :
genome = sc.textFile('../../../data/genome.txt')
genome.take(5)

Out:
['TTGGCCATGCTGCCCACTCACCTAGAGCGCACAGCTGACACTGAGTCCTCTTCTGAACCTCATCCATGAA',
'CATATTTATGAAATCTTTCCTGGCCCCAAGTGGAAATGCCCCCTCATTTGGGTCCTCACTGAACCCCAGT',
'ACACAACTCTTTTGTACTACTCTATTATGCTGGGGTGTTTTTTTATTGTCTCACCTGATAAACCGTAAGC',
'CCCTTGAAGACAGCAACTCGTTTTTAAGCTCTTTATAACCCCAGAGCCTCGCACAGTACCTGGACCAGAT',
'TAAGGGGTACTTAACAGATGCTTAGTGAAGGAAGGAATGGATTTCTCACCTGGTTGCTTATCTTCTAGAC']

## Genome example¶

• We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.

• First we will define some functions to split the bases into sequences of a certain size:

In :
def group_characters(line, n=5):
result = ''
i = 0
for ch in line:
result = result + ch
i = i + 1
if (i % n) == 0:
yield result
result = ''

def group_and_split(line):
return [sequence for sequence in group_characters(line)]

In :
group_and_split('abcdefghijklmno')

Out:
['abcde', 'fghij', 'klmno']
• Now we will transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example.

• Notice that if we simply map each line of text, we will obtain multi-dimensional data:

In :
genome.map(group_and_split).take(2)

Out:
[['TTGGC',
'CATGC',
'TGCCC',
'ACTCA',
'CCTAG',
'AGCGC',
'ACAGC',
'TGACA',
'CTGAG',
'TCCTC',
'TTCTG',
'AACCT',
'CATCC',
'ATGAA'],
['CATAT',
'TTATG',
'AAATC',
'TTTCC',
'TGGCC',
'CCAAG',
'TGGAA',
'ATGCC',
'CCCTC',
'ATTTG',
'GGTCC',
'TCACT',
'GAACC',
'CCAGT']]

### Flattening an RDD using flatMap¶

• We will need to flatten this data in order to turn it into a list of base-sequences.

• We can use the flatMap method:

In :
sequences = genome.flatMap(group_and_split)
sequences.take(3)

Out:
['TTGGC', 'CATGC', 'TGCCC']
In :
counts = \
sequences.map(
lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
counts.take(10)

Out:
[('TTGGC', 587),
('CATGC', 647),
('TGCCC', 599),
('ACTCA', 775),
('TGACA', 831),
('TTCTG', 1257),
('AACCT', 726),
('TTATG', 819),
('AAATC', 996),
('TGGCC', 718)]
• We want to rank each sequence according to its count.

• Therefore the key (first element) of each tuple should be the count.

• Thefefore we need to reverse the tuples.

In :
def reverse_tuple(key_value_pair):
return (key_value_pair, key_value_pair)

In :
sequences = counts.map(reverse_tuple)
sequences.take(10)

Out:
[(587, 'TTGGC'),
(647, 'CATGC'),
(599, 'TGCCC'),
(775, 'ACTCA'),
(831, 'TGACA'),
(1257, 'TTCTG'),
(726, 'AACCT'),
(819, 'TTATG'),
(996, 'AAATC'),
(718, 'TGGCC')]

### Sorting an RDD¶

• Now we can sort the RDD in descending order of key:
In :
sequences_sorted = sequences.sortByKey(False)
top_ten_sequences = sequences_sorted.take(10)
top_ten_sequences

Out:
[(37137, 'NNNNN'),
(4653, 'AAAAA'),
(4223, 'TTTTT'),
(2788, 'AAAAT'),
(2658, 'ATTTT'),
(2283, 'AAATA'),
(2276, 'TAAAA'),
(2197, 'TTTTA'),
(2196, 'TATTT'),
(2185, 'AGAAA')]

## Calculating $\pi$ using Spark¶

• We can estimate an approximate value for $\pi$ using the following Monte-Carlo method:
1. Inscribe a circle in a square
2. Randomly generate points in the square
3. Determine the number of points in the square that are also in the circle
4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$.
• Note that the more points generated, the better the approximation

See this tutorial.

In :
import numpy as np

def sample(p):
x, y = np.random.random(), np.random.random()
return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 5000000

count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
r = float(count) / float(NUM_SAMPLES)
print("Pi is approximately %.3f" % (4.0 * r))

Pi is approximately 3.141