In this notebook we're going to use bash
to write a mapper and a reducer to count words in a file. This example will serve to illustrate the main features of Hadoop's MapReduce framework.
MapReduce is a computing paradigm designed to allow parallel distributed processing of massive amounts of data.
Data is split across several computer nodes, there it is processed by one or more mappers. The results emitted by the mappers are first sorted and then passed to one or more reducers that process and combine the data to return the final result.
With Hadoop Streaming it is possible to use any programming language to define a mapper and/or a reducer. Here we're going to use the Unix bash
scripting language (here is the official documentation for the language).
Let's write a mapper script called map.sh
. The mapper splits each input line into words and for each word it outputs a line containing the word and 1
separated by a tab.
Example: for the input
apple orange banana apple peach
map.sh
outputs:
apple 1 orange 1 banana 1 apple 1 peach 1
The cell magic %%writefile
allows us to write the contents of the cell to a file.
%%writefile map.sh
#!/bin/bash
while read line
do
for word in $line
do
if [ -n "$word" ]
then
echo -e ${word}"\t1"
fi
done
done
Overwriting map.sh
After running the cell above, you should have a new file map.sh
in your current directory.
The file can be seen in the left panel of JupyterLab or by using a list command on the bash command-line.
Note: you can execute a single bash command in a Jupyter notebook cell by prepending an exclamation point to the command.
!ls -hl map.sh
-rwx------ 1 datalab hadoopusers 126 Nov 18 08:49 map.sh
We're going to test the mapper on on the command line with a small text file fruits.txt
by first creating the text file.
In this file apple
for instance appears two times, that's what we want our mapreduce job to compute.
%%writefile fruits.txt
apple banana
peach orange peach peach
pineapple peach apple
Overwriting fruits.txt
!cat fruits.txt
apple banana peach orange peach peach pineapple peach apple
Test the mapper
!cat fruits.txt|./map.sh
apple 1 banana 1 peach 1 orange 1 peach 1 peach 1 pineapple 1 peach 1 apple 1
If the script map.sh
does not have the executable bit set, you need to set the correct permissions.
!chmod 700 map.sh
Let us now run a MapReduce job with Hadoop Streaming.
Hadoop Streaming is a library included in the Hadoop distribution that enables you to develop MapReduce executables in languages other than Java.
Mapper and/or reducer can be any sort of executables that read the input from stdin and emit the output to stdout. By default, input is read line by line and the prefix of a line up to the first tab character is the key; the rest of the line (excluding the tab character) will be the value.
If there is no tab character in the line, then the entire line is considered as key and the value is null. The default input format is specified in the class TextInputFormat
(see the API documentation) but this can can be customized for instance by defining another field separator (see the Hadoop Streaming documentation.
This is an example of MapReduce streaming invocation syntax:
mapred streaming \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /usr/bin/wc
You can find the full official documentation for Hadoop Streaming from Apache Hadoop here: https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html.
All options for the Hadoop Streaming command are described here: Streaming Command Options and can be listed with the command
!mapred streaming --help
Usage: $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar [options] Options: -input <path> DFS input file(s) for the Map step. -output <path> DFS output directory for the Reduce step. -mapper <cmd|JavaClassName> Optional. Command to be run as mapper. -combiner <cmd|JavaClassName> Optional. Command to be run as combiner. -reducer <cmd|JavaClassName> Optional. Command to be run as reducer. -file <file> Optional. File/dir to be shipped in the Job jar file. Deprecated. Use generic option "-files" instead. -inputformat <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName> Optional. The input format class. -outputformat <TextOutputFormat(default)|JavaClassName> Optional. The output format class. -partitioner <JavaClassName> Optional. The partitioner class. -numReduceTasks <num> Optional. Number of reduce tasks. -inputreader <spec> Optional. Input recordreader spec. -cmdenv <n>=<v> Optional. Pass env.var to streaming commands. -mapdebug <cmd> Optional. To run this script when a map task fails. -reducedebug <cmd> Optional. To run this script when a reduce task fails. -io <identifier> Optional. Format to use for input to and output from mapper/reducer commands -lazyOutput Optional. Lazily create Output. -background Optional. Submit the job and don't wait till it completes. -verbose Optional. Print verbose output. -info Optional. Print detailed usage. -help Optional. Print help message. Generic options supported are: -conf <configuration file> specify an application configuration file -D <property=value> define a value for a given property -fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations. -jt <local|resourcemanager:port> specify a ResourceManager -files <file1,...> specify a comma-separated list of files to be copied to the map reduce cluster -libjars <jar1,...> specify a comma-separated list of jar files to be included in the classpath -archives <archive1,...> specify a comma-separated list of archives to be unarchived on the compute machines The general command line syntax is: command [genericOptions] [commandOptions] For more details about these options: Use $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar -info
Now in order to run a mapreduce job that we need to "upload" the input file to the Hadoop file system.
With the command hdfs dfs -l
you can view the content of your HDFS home directory.
hdfs dfs
you can run a filesystem command on the Hadoop fileystem. The complete list of commands can be found in the System Shell Guide.
!hdfs dfs -ls
Found 8 items drwx------ - datalab supergroup 0 2019-11-18 08:34 .Trash drwxr-xr-x - datalab supergroup 0 2019-11-17 20:56 .sparkStaging drwx------ - datalab supergroup 0 2019-11-18 08:49 .staging -rw-r--r-- 3 datalab supergroup 2623070 2019-11-17 20:53 FME_BaumdatenBearbeitet_OGD_20190205.csv drwxr-xr-x - datalab supergroup 0 2019-11-16 12:14 data drwxr-xr-x - datalab supergroup 0 2019-11-13 17:26 test -rw-r--r-- 3 datalab supergroup 71041024 2019-11-15 11:50 wiki_sample._COPYING_ drwxr-xr-x - datalab supergroup 0 2019-11-18 08:49 wordcount
Now create a directory wordcount
with a subdirectory input
on the Hadoop filesystem.
%%bash
hdfs dfs -mkdir -p wordcount
Copy the file fruits.txt to Hadoop in the folder wordcount/input
.
Why do we need this step? Because the file fruits.txt
needs to reside on the Hadoop filesystem in order to enjoy of all of the features of Hadoop (data partitioning, distributed processing, fault tolerance).
%%bash
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -mkdir wordcount/input
hdfs dfs -put fruits.txt wordcount/input
Let's check if the file is there now.
Note: it is convenient use the option -h
for ls
to show file sizes in human-readable form (showing sizes in Kilobytes, Megabytes, Gigabytes, etc.)
!hdfs dfs -ls -h -R wordcount/input
-rw-r--r-- 3 datalab supergroup 60 2019-11-18 08:49 wordcount/input/fruits.txt
Let's try to run the mapper using a dummy reducer (/bin/cat
does nothing else than echoing the data it receives).
Warning: mapreduce tends to produce a verbose output, so be ready to see a long output. What you should look for is a message of the kind
"INFO mapreduce.Job: Job ... completed successfully"
Note: at the beginning of next cell you'll see a command hadoop fs -rmr wordcount/output 2>/dev/null
. This is needed because when you run a job several times mapreduce will give an error if you try to overwrite the same output directory.
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
-files map.sh \
-input wordcount/input \
-output wordcount/output \
-mapper map.sh \
-reducer /bin/cat
packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob6050042463478216172.jar tmpDir=null
19/11/18 08:49:53 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472 19/11/18 08:49:53 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0251 19/11/18 08:49:53 INFO mapred.FileInputFormat: Total input files to process : 1 19/11/18 08:49:54 INFO mapreduce.JobSubmitter: number of splits:2 19/11/18 08:49:54 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address 19/11/18 08:49:54 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/11/18 08:49:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0251 19/11/18 08:49:54 INFO mapreduce.JobSubmitter: Executing with tokens: [] 19/11/18 08:49:54 INFO conf.Configuration: resource-types.xml not found 19/11/18 08:49:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 19/11/18 08:49:54 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0251 19/11/18 08:49:54 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0251/ 19/11/18 08:49:54 INFO mapreduce.Job: Running job: job_1571899345744_0251 19/11/18 08:50:03 INFO mapreduce.Job: Job job_1571899345744_0251 running in uber mode : false 19/11/18 08:50:03 INFO mapreduce.Job: map 0% reduce 0% 19/11/18 08:50:09 INFO mapreduce.Job: map 100% reduce 0% 19/11/18 08:50:14 INFO mapreduce.Job: map 100% reduce 100% 19/11/18 08:50:15 INFO mapreduce.Job: Job job_1571899345744_0251 completed successfully 19/11/18 08:50:15 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=79 FILE: Number of bytes written=687174 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=312 HDFS: Number of bytes written=78 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=474032 Total time spent by all reduces in occupied slots (ms)=140348 Total time spent by all map tasks (ms)=9116 Total time spent by all reduce tasks (ms)=2699 Total vcore-milliseconds taken by all map tasks=9116 Total vcore-milliseconds taken by all reduce tasks=2699 Total megabyte-milliseconds taken by all map tasks=46673920 Total megabyte-milliseconds taken by all reduce tasks=13818880 Map-Reduce Framework Map input records=3 Map output records=9 Map output bytes=78 Map output materialized bytes=108 Input split bytes=222 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=108 Reduce input records=9 Reduce output records=9 Spilled Records=18 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=139 CPU time spent (ms)=2630 Physical memory (bytes) snapshot=1531457536 Virtual memory (bytes) snapshot=18885754880 Total committed heap usage (bytes)=4521459712 Peak Map Physical memory (bytes)=571957248 Peak Map Virtual memory (bytes)=6292967424 Peak Reduce Physical memory (bytes)=388386816 Peak Reduce Virtual memory (bytes)=6301384704 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=90 File Output Format Counters Bytes Written=78 19/11/18 08:50:15 INFO streaming.StreamJob: Output directory: wordcount/output
The output of the mapreduce job is in the output
subfolder of the input directory. Let's check what's inside it.
!hdfs dfs -ls wordcount/output
Found 2 items -rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:50 wordcount/output/_SUCCESS -rw-r--r-- 3 datalab supergroup 78 2019-11-18 08:50 wordcount/output/part-00000
If output
contains a file named _SUCCESS
that means that the mapreduce job completed successfully.
Note: when dealing with Big Data it's always advisable to pipe the output of cat
commands to head
(or tail
).
!hdfs dfs -cat wordcount/output/part*|head
apple 1 apple 1 banana 1 orange 1 peach 1 peach 1 peach 1 peach 1 pineapple 1
We have gotten as expected all the output from the mapper. Something worth of notice is that the data outputted from the mapper has been sorted. We haven't asked for that but this step is automatically performed by the mapper as soon as the number of reducers is $\gt 0$.
The following picture illustrates the concept of shuffling and sorting that is automatically performed by Hadoop after each map before passing the output to reduce. In the picture the outputs of the two mapper tasks are shown. The arrows represent shuffling and sorting done before delivering the data to one reducer (rightmost box). The shuffling and sorting phase is often one of the most costly in a MapReduce job.
Note: the job ran with two mappers because $2$ is the default number of mappers in Hadoop.
Let's now write a reducer script called reduce.sh
.
%%writefile reduce.sh
#!/bin/bash
currkey=""
currcount=0
while IFS=$'\t' read -r key val
do
if [[ $key == $currkey ]]
then
currcount=$(( currcount + val ))
else
if [ -n "$currkey" ]
then
echo -e ${currkey} "\t" ${currcount}
fi
currkey=$key
currcount=1
fi
done
# last one
echo -e ${currkey} "\t" ${currcount}
Overwriting reduce.sh
Set permission for the reducer script.
!chmod 700 reduce.sh
Test map and reduce on the shell
!cat fruits.txt|./map.sh|sort|./reduce.sh
apple 2 banana 1 orange 1 peach 4 pineapple 1
Once we've made sure that the reducer script runs correctly on the shell, we can run it on the cluster.
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
-file map.sh \
-file reduce.sh \
-input wordcount/input \
-output wordcount/output \
-mapper map.sh \
-reducer reduce.sh
packageJobJar: [map.sh, reduce.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob4511756596880012363.jar tmpDir=null
19/11/18 08:50:23 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/11/18 08:50:25 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472 19/11/18 08:50:25 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0252 19/11/18 08:50:26 INFO mapred.FileInputFormat: Total input files to process : 1 19/11/18 08:50:26 INFO mapreduce.JobSubmitter: number of splits:2 19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address 19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0252 19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Executing with tokens: [] 19/11/18 08:50:26 INFO conf.Configuration: resource-types.xml not found 19/11/18 08:50:26 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 19/11/18 08:50:26 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0252 19/11/18 08:50:26 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0252/ 19/11/18 08:50:26 INFO mapreduce.Job: Running job: job_1571899345744_0252 19/11/18 08:50:36 INFO mapreduce.Job: Job job_1571899345744_0252 running in uber mode : false 19/11/18 08:50:36 INFO mapreduce.Job: map 0% reduce 0% 19/11/18 08:50:43 INFO mapreduce.Job: map 100% reduce 0% 19/11/18 08:50:49 INFO mapreduce.Job: map 100% reduce 100% 19/11/18 08:50:50 INFO mapreduce.Job: Job job_1571899345744_0252 completed successfully 19/11/18 08:50:50 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=79 FILE: Number of bytes written=687864 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=312 HDFS: Number of bytes written=56 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=484692 Total time spent by all reduces in occupied slots (ms)=241904 Total time spent by all map tasks (ms)=9321 Total time spent by all reduce tasks (ms)=4652 Total vcore-milliseconds taken by all map tasks=9321 Total vcore-milliseconds taken by all reduce tasks=4652 Total megabyte-milliseconds taken by all map tasks=47723520 Total megabyte-milliseconds taken by all reduce tasks=23818240 Map-Reduce Framework Map input records=3 Map output records=9 Map output bytes=78 Map output materialized bytes=108 Input split bytes=222 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=108 Reduce input records=9 Reduce output records=5 Spilled Records=18 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=158 CPU time spent (ms)=2550 Physical memory (bytes) snapshot=1526378496 Virtual memory (bytes) snapshot=18883096576 Total committed heap usage (bytes)=4490002432 Peak Map Physical memory (bytes)=580694016 Peak Map Virtual memory (bytes)=6292299776 Peak Reduce Physical memory (bytes)=368877568 Peak Reduce Virtual memory (bytes)=6298894336 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=90 File Output Format Counters Bytes Written=56 19/11/18 08:50:50 INFO streaming.StreamJob: Output directory: wordcount/output
Let's check the output on the HDFS filesystem
!hdfs dfs -cat wordcount/output/part*|head
apple 2 banana 1 orange 1 peach 4 pineapple 1
Let's create a datafile by downloading some real data, for instance from a Web page. This example will be used to introduce some advanced configurations.
Next, we download a URL with wget
and filter out HTML tags with a sed
regular expression.
%%bash
URL=https://www.derstandard.at/story/2000110819049/und-wo-warst-du-beim-fall-der-mauer
wget -qO- $URL | sed -e 's/<[^>]*>//g;s/^ //g' >sample_article.txt
!cat sample_article.txt|./map.sh|head
1 1 1 1 Und 1 wo 1 warst 1 du 1 beim 1 Fall 1
As usual, with real data there's some more work to do. Here we see that the mapper script doesn't skip empty lines. Let's modify it so that empty lines are skipped.
%%writefile map.sh
#!/bin/bash
while read line
do
for word in $line
do
if [[ "$line" =~ [^[:space:]] ]]
then
if [ -n "$word" ]
then
echo -e ${word} "\t1"
fi
fi
done
done
Overwriting map.sh
!cat sample_article.txt|./map.sh|head
Und 1 wo 1 warst 1 du 1 beim 1 Fall 1 der 1 Mauer? 1 - 1 1
Now the output of map.sh
looks better!
Note: when working with real data we need in general some more preprocessing in order to remove control characters or invalid unicode.
Time to run MapReduce again with the new data, but first we need to "put" the data on HDFS.
%%bash
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -put sample_article.txt wordcount/input
# check that the folder wordcount/input on HDFS only contains sample_article.txt
!hdfs dfs -ls -h wordcount/input
-rw-r--r-- 3 datalab supergroup 4.1 K 2019-11-18 08:50 wordcount/input
Check the reducer
!cat sample_article.txt|./map.sh|./reduce.sh|head
Und 1 wo 1 warst 1 du 1 beim 1 Fall 1 der 1 Mauer? 1 - 1 2
%%bash
hadoop fs -rmr wordcount/output 2>/dev/null
mapred streaming \
-file map.sh \
-file reduce.sh \
-input wordcount/input \
-output wordcount/output \
-mapper map.sh \
-reducer reduce.sh
packageJobJar: [map.sh, reduce.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob1458624376542077747.jar tmpDir=null
19/11/18 08:51:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/11/18 08:51:04 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472 19/11/18 08:51:04 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0253 19/11/18 08:51:05 INFO mapred.FileInputFormat: Total input files to process : 1 19/11/18 08:51:05 INFO mapreduce.JobSubmitter: number of splits:2 19/11/18 08:51:05 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address 19/11/18 08:51:05 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/11/18 08:51:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0253 19/11/18 08:51:05 INFO mapreduce.JobSubmitter: Executing with tokens: [] 19/11/18 08:51:06 INFO conf.Configuration: resource-types.xml not found 19/11/18 08:51:06 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 19/11/18 08:51:06 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0253 19/11/18 08:51:06 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0253/ 19/11/18 08:51:06 INFO mapreduce.Job: Running job: job_1571899345744_0253 19/11/18 08:51:14 INFO mapreduce.Job: Job job_1571899345744_0253 running in uber mode : false 19/11/18 08:51:14 INFO mapreduce.Job: map 0% reduce 0% 19/11/18 08:51:21 INFO mapreduce.Job: map 100% reduce 0% 19/11/18 08:51:27 INFO mapreduce.Job: map 100% reduce 100% 19/11/18 08:51:28 INFO mapreduce.Job: Job job_1571899345744_0253 completed successfully 19/11/18 08:51:28 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=2056 FILE: Number of bytes written=691913 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=6428 HDFS: Number of bytes written=2273 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=482924 Total time spent by all reduces in occupied slots (ms)=209040 Total time spent by all map tasks (ms)=9287 Total time spent by all reduce tasks (ms)=4020 Total vcore-milliseconds taken by all map tasks=9287 Total vcore-milliseconds taken by all reduce tasks=4020 Total megabyte-milliseconds taken by all map tasks=47549440 Total megabyte-milliseconds taken by all reduce tasks=20582400 Map-Reduce Framework Map input records=191 Map output records=211 Map output bytes=2492 Map output materialized bytes=2180 Input split bytes=200 Combine input records=0 Combine output records=0 Reduce input groups=164 Reduce shuffle bytes=2180 Reduce input records=211 Reduce output records=164 Spilled Records=422 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=146 CPU time spent (ms)=2820 Physical memory (bytes) snapshot=1466261504 Virtual memory (bytes) snapshot=18885009408 Total committed heap usage (bytes)=4507828224 Peak Map Physical memory (bytes)=549105664 Peak Map Virtual memory (bytes)=6292787200 Peak Reduce Physical memory (bytes)=372985856 Peak Reduce Virtual memory (bytes)=6300979200 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=6228 File Output Format Counters Bytes Written=2273 19/11/18 08:51:28 INFO streaming.StreamJob: Output directory: wordcount/output
Check the output on HDFS
!hdfs dfs -ls wordcount/output
Found 2 items -rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:51 wordcount/output/_SUCCESS -rw-r--r-- 3 datalab supergroup 2273 2019-11-18 08:51 wordcount/output/part-00000
This job took a few seconds and this is quite some time for such a small file (4KB). This is due to the overhead of distributing the data and running the Hadoop framework. The advantage of Hadoop can be appreciated only for large datasets.
!hdfs dfs -cat wordcount/output/part-00000|head
& 1 (Herder-Verlag) 1 - 1 / 2 1950 1 24 1 30 1 30-Jährige 1 <path 1 AGB 1
sort
¶We've obtained a list of tokens that appear in the file followed by their frequencies.
The output of the reducer is sorted by key (the word) because that's the ordering that the reducer becomes from the mapper. If we're interested in sorting the data by frequency, we can use the Unix sort
command (with the options k2
, n
, r
respectively "by field 2", "numeric", "reverse").
!hdfs dfs -cat wordcount/output/part-00000|sort -k2nr|head
die 8 der 6 Cookies 4 und 4 derStandard.at 3 Fall 3 ich 3 in 3 kann 3 ohne 3
The most common word appears to be "die" (the German for the definite article "the").
If we wanted to sort the output of the reducer using the mapreduce framework, we could employ a simple trick: create a mapper that interchanges words with their frequency values. Since by construction mappers sort their output by key, we get the desired sorting as a side-effect.
Call the new mapper swap_keyval.sh
.
%%writefile swap_keyval.sh
#!/bin/bash
# This script will read one line at a time and swap key/value
# For instance, the line "word 100" will become "100 word"
while read key val
do
printf "%s\t%s\n" "$val" "$key"
done
Overwriting swap_keyval.sh
We are going to run the swap mapper script on the output of the previous mapreduce job. Note that in the below cell we are not deleting the previous output but instead we're saving the output from the current job in a new folder output_sorted
.
Nice thing about running a job on the output of a preceding job is that we do not need to upload files to HDFS because the data is already on HDFS. Not so nice: writing data to disk at each step of a data transformation pipeline takes time and this can be costly for longer data pipelines. This is one of the shortcomings of MapReduce that are addressed by Apache Spark.
%%bash
hdfs dfs -rm -r wordcount/output2 2>/dev/null
mapred streaming \
-file swap_keyval.sh \
-input wordcount/output \
-output wordcount/output2 \
-mapper swap_keyval.sh
packageJobJar: [swap_keyval.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob8599577742606945962.jar tmpDir=null
19/11/18 08:51:38 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/11/18 08:51:39 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472 19/11/18 08:51:40 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0254 19/11/18 08:51:40 INFO mapred.FileInputFormat: Total input files to process : 1 19/11/18 08:51:40 INFO mapreduce.JobSubmitter: number of splits:2 19/11/18 08:51:40 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address 19/11/18 08:51:40 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/11/18 08:51:41 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0254 19/11/18 08:51:41 INFO mapreduce.JobSubmitter: Executing with tokens: [] 19/11/18 08:51:41 INFO conf.Configuration: resource-types.xml not found 19/11/18 08:51:41 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 19/11/18 08:51:41 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0254 19/11/18 08:51:41 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0254/ 19/11/18 08:51:41 INFO mapreduce.Job: Running job: job_1571899345744_0254 19/11/18 08:51:50 INFO mapreduce.Job: Job job_1571899345744_0254 running in uber mode : false 19/11/18 08:51:50 INFO mapreduce.Job: map 0% reduce 0% 19/11/18 08:51:56 INFO mapreduce.Job: map 100% reduce 0% 19/11/18 08:52:03 INFO mapreduce.Job: map 100% reduce 100% 19/11/18 08:52:04 INFO mapreduce.Job: Job job_1571899345744_0254 completed successfully 19/11/18 08:52:04 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=1833 FILE: Number of bytes written=688625 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3634 HDFS: Number of bytes written=1945 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=482976 Total time spent by all reduces in occupied slots (ms)=236028 Total time spent by all map tasks (ms)=9288 Total time spent by all reduce tasks (ms)=4539 Total vcore-milliseconds taken by all map tasks=9288 Total vcore-milliseconds taken by all reduce tasks=4539 Total megabyte-milliseconds taken by all map tasks=47554560 Total megabyte-milliseconds taken by all reduce tasks=23239680 Map-Reduce Framework Map input records=164 Map output records=164 Map output bytes=1945 Map output materialized bytes=1920 Input split bytes=224 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=1920 Reduce input records=164 Reduce output records=164 Spilled Records=328 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=156 CPU time spent (ms)=2570 Physical memory (bytes) snapshot=1477533696 Virtual memory (bytes) snapshot=18889134080 Total committed heap usage (bytes)=4451205120 Peak Map Physical memory (bytes)=558252032 Peak Map Virtual memory (bytes)=6294482944 Peak Reduce Physical memory (bytes)=372002816 Peak Reduce Virtual memory (bytes)=6301200384 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=3410 File Output Format Counters Bytes Written=1945 19/11/18 08:52:04 INFO streaming.StreamJob: Output directory: wordcount/output2
Check the output on HDFS
!hdfs dfs -ls wordcount/output2
Found 2 items -rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:52 wordcount/output2/_SUCCESS -rw-r--r-- 3 datalab supergroup 1945 2019-11-18 08:52 wordcount/output2/part-00000
!hdfs dfs -cat wordcount/output2/part-00000|head
1 an 1 – 1 überraschen. 1 Über 1 zustimmungspflichtige 1 zustimmen 1 zum 1 zu. 1 widerrufen. 1 werden.
Mapper uses by default ascending order to sort by key. We could have changed that with an option but for now let's look at the end of the file.
!hdfs dfs -cat wordcount/output2/part-00000|tail
3 kann 3 in 3 derStandard.at 3 ich 3 Fall 3 Sie 4 Cookies 4 und 6 der 8 die
KeyFieldBasedComparator
¶In general, we can determine how mappers are going to sort their output by configuring the comparator directive to use the special class KeyFieldBasedComparator
-D mapreduce.job.output.key.comparator.class=\ org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
This class has some options similar to the Unix sort
(-n
to sort numerically, -r
for reverse sorting, -k pos1[,pos2]
for specifying fields to sort by).
Let us see the comparator in action on our data to get the desired result. Note that this time we are removing output2
because we're running the second mapreduce job again.
%%bash
hdfs dfs -rmr wordcount/output2 2>/dev/null
comparator_class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
mapred streaming \
-D mapreduce.job.output.key.comparator.class=$comparator_class \
-D mapreduce.partition.keycomparator.options=-nr \
-file swap_keyval.sh \
-input wordcount/output \
-output wordcount/output2 \
-mapper swap_keyval.sh
packageJobJar: [swap_keyval.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob1477140608741397366.jar tmpDir=null
19/11/18 08:52:14 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/11/18 08:52:16 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472 19/11/18 08:52:16 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0255 19/11/18 08:52:16 INFO mapred.FileInputFormat: Total input files to process : 1 19/11/18 08:52:17 INFO mapreduce.JobSubmitter: number of splits:2 19/11/18 08:52:17 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address 19/11/18 08:52:17 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/11/18 08:52:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0255 19/11/18 08:52:17 INFO mapreduce.JobSubmitter: Executing with tokens: [] 19/11/18 08:52:17 INFO conf.Configuration: resource-types.xml not found 19/11/18 08:52:17 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 19/11/18 08:52:17 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0255 19/11/18 08:52:17 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0255/ 19/11/18 08:52:17 INFO mapreduce.Job: Running job: job_1571899345744_0255 19/11/18 08:52:25 INFO mapreduce.Job: Job job_1571899345744_0255 running in uber mode : false 19/11/18 08:52:25 INFO mapreduce.Job: map 0% reduce 0% 19/11/18 08:52:32 INFO mapreduce.Job: map 100% reduce 0% 19/11/18 08:52:39 INFO mapreduce.Job: map 100% reduce 100% 19/11/18 08:52:40 INFO mapreduce.Job: Job job_1571899345744_0255 completed successfully 19/11/18 08:52:40 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=1827 FILE: Number of bytes written=689819 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3634 HDFS: Number of bytes written=1945 HDFS: Number of read operations=11 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=483288 Total time spent by all reduces in occupied slots (ms)=238472 Total time spent by all map tasks (ms)=9294 Total time spent by all reduce tasks (ms)=4586 Total vcore-milliseconds taken by all map tasks=9294 Total vcore-milliseconds taken by all reduce tasks=4586 Total megabyte-milliseconds taken by all map tasks=47585280 Total megabyte-milliseconds taken by all reduce tasks=23480320 Map-Reduce Framework Map input records=164 Map output records=164 Map output bytes=1945 Map output materialized bytes=1929 Input split bytes=224 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=1929 Reduce input records=164 Reduce output records=164 Spilled Records=328 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=163 CPU time spent (ms)=2630 Physical memory (bytes) snapshot=1525448704 Virtual memory (bytes) snapshot=18887892992 Total committed heap usage (bytes)=4505206784 Peak Map Physical memory (bytes)=578318336 Peak Map Virtual memory (bytes)=6292914176 Peak Reduce Physical memory (bytes)=370286592 Peak Reduce Virtual memory (bytes)=6302941184 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=3410 File Output Format Counters Bytes Written=1945 19/11/18 08:52:40 INFO streaming.StreamJob: Output directory: wordcount/output2
!hdfs dfs -ls wordcount/output2
Found 2 items -rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:52 wordcount/output2/_SUCCESS -rw-r--r-- 3 datalab supergroup 1945 2019-11-18 08:52 wordcount/output2/part-00000
!hdfs dfs -cat wordcount/output2/part-00000|head
8 die 6 der 4 und 4 Cookies 3 Fall 3 Sie 3 ich 3 kann 3 warst 3 in
Now we get the output in the desired order.
With the -D
option it is possible to override options set in the default configuration file mapred_default.xml
(see the Apache Hadoop documentation).
One option that might come handy when dealing with out-of-memory issues in the sorting phase is the size in MB of the memory reserved for sorting mapreduce.task.io.sort.mb
:
-D mapreduce.task.io.sort.mb=512
Note: the maximum value for mapreduce.task.io.sort.mb
is 2047.
Counting the frequencies of words is at the basis of indexing and it facilitates the retrieval of relevant documents in search engines.