mrjob is a Python module for writing multi-step MapReduce jobs in Python. In this notebook we're going to run a basic wordcount example.
Find here the mrjob documentation: https://mrjob.readthedocs.io/en/latest/
The file we're going to use is called file.txt
and has a size of 500MB.
%%bash
ls -lh file.txt
-r--r--r-- 1 user123 hadoopusers 429M Apr 17 23:00 file.txt
Write a mrjob file word_count.py
using the Jupyter cell magic %%file
%%file word_count.py
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
Overwriting word_count.py
We're going to use $10$ map and $3$ reduce tasks.
%%bash
DATAFILE=file.txt
STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
N=10
# N map tasks
python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE
"lines" 24000 "chars" 447935288 "words" 70482885
No configs found; falling back on auto-configuration No configs specified for hadoop runner Looking for hadoop binary in $PATH... Found hadoop binary: /usr/bin/hadoop Using Hadoop version 3.0.0 Creating temp directory /tmp/word_count.x123.20200417.210235.762742 Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/files/... Running step 1 of 1... WARNING: Use "yarn jar" to launch YARN applications. packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob8532801643576016516.jar tmpDir=null Connecting to ResourceManager at c100.local/10.7.0.100:8032 Connecting to ResourceManager at c100.local/10.7.0.100:8032 Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6670 Total input files to process : 1 Adding a new node: /default/10.7.0.101:9866 Adding a new node: /default/10.7.0.111:9866 Adding a new node: /default/10.7.0.110:9866 Adding a new node: /default/10.7.0.114:9866 Adding a new node: /default/10.7.0.118:9866 Adding a new node: /default/10.7.0.105:9866 Adding a new node: /default/10.7.0.102:9866 Adding a new node: /default/10.7.0.115:9866 number of splits:10 yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled Submitting tokens for job: job_1586332778980_6670 Executing with tokens: [] resource-types.xml not found Unable to find 'resource-types.xml'. Submitted application application_1586332778980_6670 The url to track the job: http://c100.local:8088/proxy/application_1586332778980_6670/ Running job: job_1586332778980_6670 Job job_1586332778980_6670 running in uber mode : false map 0% reduce 0% map 100% reduce 0% map 100% reduce 33% map 100% reduce 100% Job job_1586332778980_6670 completed successfully Output directory: hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output Counters: 55 File Input Format Counters Bytes Read=450112396 File Output Format Counters Bytes Written=49 File System Counters FILE: Number of bytes read=266750 FILE: Number of bytes written=3508513 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=450113846 HDFS: Number of bytes read erasure-coded=0 HDFS: Number of bytes written=49 HDFS: Number of large read operations=0 HDFS: Number of read operations=45 HDFS: Number of write operations=6 Job Counters Data-local map tasks=4 Launched map tasks=10 Launched reduce tasks=3 Rack-local map tasks=6 Total megabyte-milliseconds taken by all map tasks=345584640 Total megabyte-milliseconds taken by all reduce tasks=79928320 Total time spent by all map tasks (ms)=67497 Total time spent by all maps in occupied slots (ms)=3509844 Total time spent by all reduce tasks (ms)=15611 Total time spent by all reduces in occupied slots (ms)=811772 Total vcore-milliseconds taken by all map tasks=67497 Total vcore-milliseconds taken by all reduce tasks=15611 Map-Reduce Framework CPU time spent (ms)=20440 Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=1626 Input split bytes=1450 Map input records=24000 Map output bytes=870866 Map output materialized bytes=293646 Map output records=72000 Merged Map outputs=30 Peak Map Physical memory (bytes)=622194688 Peak Map Virtual memory (bytes)=6293897216 Peak Reduce Physical memory (bytes)=378032128 Peak Reduce Virtual memory (bytes)=6305234944 Physical memory (bytes) snapshot=7270031360 Reduce input groups=3 Reduce input records=72000 Reduce output records=3 Reduce shuffle bytes=293646 Shuffled Maps =30 Spilled Records=144000 Total committed heap usage (bytes)=19748356096 Virtual memory (bytes) snapshot=81836744704 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 job output is in hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output Streaming final output from hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output... Removing HDFS temp directory hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742... Removing temp directory /tmp/word_count.x123.20200417.210235.762742...
Run the same job but this time with $4$ mappers and keep track of the job duration.
%%bash
START=$(date +%s);
DATAFILE=/home/dataLAB/data/wiki429MB # 429MB
STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
N=4
# N map tasks
python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE
2>/dev/null
END=$(date +%s);
echo $((END-START)) | awk '{print "Duration: "int($1/60)":"int($1%60)}'
"lines" 24000 "chars" 447935288 "words" 70482885 Duration: 0:56
No configs found; falling back on auto-configuration No configs specified for hadoop runner Looking for hadoop binary in $PATH... Found hadoop binary: /usr/bin/hadoop Using Hadoop version 3.0.0 Creating temp directory /tmp/word_count.x123.20200417.210414.346782 Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/files/... Running step 1 of 1... WARNING: Use "yarn jar" to launch YARN applications. packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob1751647060912723961.jar tmpDir=null Connecting to ResourceManager at c100.local/10.7.0.100:8032 Connecting to ResourceManager at c100.local/10.7.0.100:8032 Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6672 Total input files to process : 1 number of splits:4 yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled Submitting tokens for job: job_1586332778980_6672 Executing with tokens: [] resource-types.xml not found Unable to find 'resource-types.xml'. Submitted application application_1586332778980_6672 The url to track the job: http://c100.local:8088/proxy/application_1586332778980_6672/ Running job: job_1586332778980_6672 Job job_1586332778980_6672 running in uber mode : false map 0% reduce 0% map 25% reduce 0% map 100% reduce 0% map 100% reduce 33% map 100% reduce 100% Job job_1586332778980_6672 completed successfully Output directory: hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output Counters: 54 File Input Format Counters Bytes Read=449811647 File Output Format Counters Bytes Written=49 File System Counters FILE: Number of bytes read=266033 FILE: Number of bytes written=2132040 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=449812231 HDFS: Number of bytes read erasure-coded=0 HDFS: Number of bytes written=49 HDFS: Number of large read operations=0 HDFS: Number of read operations=27 HDFS: Number of write operations=6 Job Counters Launched map tasks=4 Launched reduce tasks=3 Rack-local map tasks=4 Total megabyte-milliseconds taken by all map tasks=146442240 Total megabyte-milliseconds taken by all reduce tasks=79375360 Total time spent by all map tasks (ms)=28602 Total time spent by all maps in occupied slots (ms)=1487304 Total time spent by all reduce tasks (ms)=15503 Total time spent by all reduces in occupied slots (ms)=806156 Total vcore-milliseconds taken by all map tasks=28602 Total vcore-milliseconds taken by all reduce tasks=15503 Map-Reduce Framework CPU time spent (ms)=11650 Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=485 Input split bytes=584 Map input records=24000 Map output bytes=870866 Map output materialized bytes=278702 Map output records=72000 Merged Map outputs=12 Peak Map Physical memory (bytes)=625303552 Peak Map Virtual memory (bytes)=6294085632 Peak Reduce Physical memory (bytes)=383156224 Peak Reduce Virtual memory (bytes)=6303899648 Physical memory (bytes) snapshot=3602575360 Reduce input groups=3 Reduce input records=72000 Reduce output records=3 Reduce shuffle bytes=278702 Shuffled Maps =12 Spilled Records=144000 Total committed heap usage (bytes)=10420224000 Virtual memory (bytes) snapshot=44077326336 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 job output is in hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output Streaming final output from hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output... Removing HDFS temp directory hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782... Removing temp directory /tmp/word_count.x123.20200417.210414.346782...