Hello World!
¶For this tutorial, we are going to download the core Hadoop distribution and run Hadoop in local standalone mode:
❝ By default, Hadoop is configured to run in a non-distributed mode, as a single Java process. ❞
(see https://hadoop.apache.org/docs/stable/.../Standalone_Operation)
We are going to run a MapReduce job using MapReduce's streaming application. This is not to be confused with real-time streaming:
❝ Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. ❞
MapReduce streaming defaults to using IdentityMapper
and IdentityReducer
, thus eliminating the need for explicit specification of a mapper or reducer. Finally, we show how to run a map-only job by setting mapreduce.job.reduce
equal to $0$.
Both input and output are standard files since Hadoop's default filesystem is the regular file system, as specified by the fs.defaultFS
property in core-default.xml).
HADOOP_URL = "https://dlcdn.apache.org/hadoop/common/stable/hadoop-3.3.6.tar.gz"
import requests
import os
import tarfile
def download_and_extract_targz(url):
response = requests.get(url)
filename = url.rsplit('/', 1)[-1]
HADOOP_HOME = filename[:-7]
# set HADOOP_HOME environment variable
os.environ['HADOOP_HOME'] = HADOOP_HOME
if os.path.isdir(HADOOP_HOME):
print("Not downloading, Hadoop folder {} already exists".format(HADOOP_HOME))
return
if response.status_code == 200:
with open(filename, 'wb') as file:
file.write(response.content)
with tarfile.open(filename, 'r:gz') as tar_ref:
extract_path = tar_ref.extractall(path='.')
# Get the names of all members (files and directories) in the archive
all_members = tar_ref.getnames()
# If there is a top-level directory, get its name
if all_members:
top_level_directory = all_members[0]
print(f"ZIP file downloaded and extracted successfully. Contents saved at: {top_level_directory}")
else:
print(f"Failed to download ZIP file. Status code: {response.status_code}")
download_and_extract_targz(HADOOP_URL)
Not downloading, Hadoop folder hadoop-3.3.6 already exists
HADOOP_HOME
and PATH
¶# HADOOP_HOME was set earlier when downloading Hadoop distribution
print("HADOOP_HOME is {}".format(os.environ['HADOOP_HOME']))
os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']])
print("PATH is {}".format(os.environ['PATH']))
HADOOP_HOME is hadoop-3.3.6 PATH is hadoop-3.3.6/bin:/opt/hostedtoolcache/Python/3.8.18/x64/bin:/opt/hostedtoolcache/Python/3.8.18/x64:/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin
JAVA_HOME
¶While Java is readily available on Google Colab, we consider the broader scenario of an Ubuntu machine. In this case, we ensure compatibility by installing Java, specifically opting for the openjdk-19-jre-headless
version.
import shutil
# set variable JAVA_HOME (install Java if necessary)
def is_java_installed():
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
return os.environ['JAVA_HOME']
def install_java():
# Uncomment and modify the desired version
# java_version= 'openjdk-11-jre-headless'
# java_version= 'default-jre'
# java_version= 'openjdk-17-jre-headless'
# java_version= 'openjdk-18-jre-headless'
java_version= 'openjdk-19-jre-headless'
print(f"Java not found. Installing {java_version} ... (this might take a while)")
try:
cmd = f"apt install -y {java_version}"
subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
stdout_result = subprocess_output.stdout
# Process the results as needed
print("Done installing Java {}".format(java_version))
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
print("JAVA_HOME is {}".format(os.environ['JAVA_HOME']))
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
print("Command failed with return code {}".format(e.returncode))
print("stdout: {}".format(e.stdout))
# Install Java if not available
if is_java_installed():
print("Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
print("Installing Java")
install_java()
Java is already installed: /usr/lib/jvm/temurin-11-jdk-amd64
Write the string"Hello, World!" to a local file.
Note: you will be writing to the file ./hello.txt
in your current directory (denoted by ./
).
!echo "Hello, World!">./hello.txt
Since the default filesystem is the local filesystem (as opposed to HDFS) we do not need to upload the local file hello.txt
to HDFS.
Run a MapReduce job with /bin/cat
as a mapper and no reducer.
Note: the first step of removing the output directory is necessary because MapReduce does not overwrite data folders by design.
%%bash
hdfs dfs -rm -r my_output
mapred streaming \
-input hello.txt \
-output my_output \
-mapper '/bin/cat'
rm: `my_output': No such file or directory 2024-03-11 15:45:58,688 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties 2024-03-11 15:45:58,770 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 2024-03-11 15:45:58,770 INFO impl.MetricsSystemImpl: JobTracker metrics system started 2024-03-11 15:45:58,783 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:45:58,931 INFO mapred.FileInputFormat: Total input files to process : 1 2024-03-11 15:45:58,944 INFO mapreduce.JobSubmitter: number of splits:1 2024-03-11 15:45:59,069 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local370782050_0001 2024-03-11 15:45:59,070 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2024-03-11 15:45:59,164 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 2024-03-11 15:45:59,166 INFO mapreduce.Job: Running job: job_local370782050_0001 2024-03-11 15:45:59,172 INFO mapred.LocalJobRunner: OutputCommitter set in config null 2024-03-11 15:45:59,175 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 2024-03-11 15:45:59,180 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:45:59,181 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:45:59,212 INFO mapred.LocalJobRunner: Waiting for map tasks 2024-03-11 15:45:59,217 INFO mapred.LocalJobRunner: Starting task: attempt_local370782050_0001_m_000000_0 2024-03-11 15:45:59,237 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:45:59,237 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:45:59,253 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:45:59,258 INFO mapred.MapTask: Processing split: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:45:59,265 INFO mapred.MapTask: numReduceTasks: 1 2024-03-11 15:45:59,281 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 2024-03-11 15:45:59,281 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 2024-03-11 15:45:59,281 INFO mapred.MapTask: soft limit at 83886080 2024-03-11 15:45:59,281 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 2024-03-11 15:45:59,281 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 2024-03-11 15:45:59,284 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 2024-03-11 15:45:59,288 INFO streaming.PipeMapRed: PipeMapRed exec [/bin/cat] 2024-03-11 15:45:59,292 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir 2024-03-11 15:45:59,294 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir 2024-03-11 15:45:59,294 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file 2024-03-11 15:45:59,294 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length 2024-03-11 15:45:59,295 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 2024-03-11 15:45:59,295 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 2024-03-11 15:45:59,296 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start 2024-03-11 15:45:59,296 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 2024-03-11 15:45:59,296 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 2024-03-11 15:45:59,297 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 2024-03-11 15:45:59,297 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 2024-03-11 15:45:59,303 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name 2024-03-11 15:45:59,319 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 2024-03-11 15:45:59,319 INFO streaming.PipeMapRed: Records R/W=1/1 2024-03-11 15:45:59,320 INFO streaming.PipeMapRed: MRErrorThread done 2024-03-11 15:45:59,321 INFO streaming.PipeMapRed: mapRedFinished 2024-03-11 15:45:59,323 INFO mapred.LocalJobRunner: 2024-03-11 15:45:59,323 INFO mapred.MapTask: Starting flush of map output 2024-03-11 15:45:59,323 INFO mapred.MapTask: Spilling map output 2024-03-11 15:45:59,323 INFO mapred.MapTask: bufstart = 0; bufend = 15; bufvoid = 104857600 2024-03-11 15:45:59,323 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600 2024-03-11 15:45:59,328 INFO mapred.MapTask: Finished spill 0 2024-03-11 15:45:59,338 INFO mapred.Task: Task:attempt_local370782050_0001_m_000000_0 is done. And is in the process of committing 2024-03-11 15:45:59,340 INFO mapred.LocalJobRunner: Records R/W=1/1 2024-03-11 15:45:59,340 INFO mapred.Task: Task 'attempt_local370782050_0001_m_000000_0' done. 2024-03-11 15:45:59,346 INFO mapred.Task: Final Counters for attempt_local370782050_0001_m_000000_0: Counters: 17 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=781694 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Map output bytes=15 Map output materialized bytes=23 Input split bytes=102 Combine input records=0 Spilled Records=1 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=0 Total committed heap usage (bytes)=314572800 File Input Format Counters Bytes Read=14 2024-03-11 15:45:59,347 INFO mapred.LocalJobRunner: Finishing task: attempt_local370782050_0001_m_000000_0 2024-03-11 15:45:59,350 INFO mapred.LocalJobRunner: map task executor complete. 2024-03-11 15:45:59,355 INFO mapred.LocalJobRunner: Waiting for reduce tasks 2024-03-11 15:45:59,355 INFO mapred.LocalJobRunner: Starting task: attempt_local370782050_0001_r_000000_0 2024-03-11 15:45:59,360 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:45:59,360 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:45:59,360 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:45:59,363 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@9ffba29 2024-03-11 15:45:59,368 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:45:59,380 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=2933076736, maxSingleShuffleLimit=733269184, mergeThreshold=1935830784, ioSortFactor=10, memToMemMergeOutputsThreshold=10 2024-03-11 15:45:59,382 INFO reduce.EventFetcher: attempt_local370782050_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 2024-03-11 15:45:59,407 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local370782050_0001_m_000000_0 decomp: 19 len: 23 to MEMORY 2024-03-11 15:45:59,410 INFO reduce.InMemoryMapOutput: Read 19 bytes from map-output for attempt_local370782050_0001_m_000000_0 2024-03-11 15:45:59,412 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 19, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->19 2024-03-11 15:45:59,415 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 2024-03-11 15:45:59,416 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:45:59,416 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2024-03-11 15:45:59,421 INFO mapred.Merger: Merging 1 sorted segments 2024-03-11 15:45:59,421 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 3 bytes 2024-03-11 15:45:59,422 INFO reduce.MergeManagerImpl: Merged 1 segments, 19 bytes to disk to satisfy reduce memory limit 2024-03-11 15:45:59,435 INFO reduce.MergeManagerImpl: Merging 1 files, 23 bytes from disk 2024-03-11 15:45:59,436 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 2024-03-11 15:45:59,436 INFO mapred.Merger: Merging 1 sorted segments 2024-03-11 15:45:59,438 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 3 bytes 2024-03-11 15:45:59,439 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:45:59,444 INFO mapred.Task: Task:attempt_local370782050_0001_r_000000_0 is done. And is in the process of committing 2024-03-11 15:45:59,445 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:45:59,445 INFO mapred.Task: Task attempt_local370782050_0001_r_000000_0 is allowed to commit now 2024-03-11 15:45:59,447 INFO output.FileOutputCommitter: Saved output of task 'attempt_local370782050_0001_r_000000_0' to file:/home/runner/work/big_data/big_data/my_output 2024-03-11 15:45:59,447 INFO mapred.LocalJobRunner: reduce > reduce 2024-03-11 15:45:59,447 INFO mapred.Task: Task 'attempt_local370782050_0001_r_000000_0' done. 2024-03-11 15:45:59,448 INFO mapred.Task: Final Counters for attempt_local370782050_0001_r_000000_0: Counters: 24 File System Counters FILE: Number of bytes read=141515 FILE: Number of bytes written=781744 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=23 Reduce input records=1 Reduce output records=1 Spilled Records=1 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=12 Total committed heap usage (bytes)=314572800 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Output Format Counters Bytes Written=27 2024-03-11 15:45:59,448 INFO mapred.LocalJobRunner: Finishing task: attempt_local370782050_0001_r_000000_0 2024-03-11 15:45:59,448 INFO mapred.LocalJobRunner: reduce task executor complete. 2024-03-11 15:46:00,171 INFO mapreduce.Job: Job job_local370782050_0001 running in uber mode : false 2024-03-11 15:46:00,172 INFO mapreduce.Job: map 100% reduce 100% 2024-03-11 15:46:00,173 INFO mapreduce.Job: Job job_local370782050_0001 completed successfully 2024-03-11 15:46:00,178 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=282952 FILE: Number of bytes written=1563438 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Map output bytes=15 Map output materialized bytes=23 Input split bytes=102 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=23 Reduce input records=1 Reduce output records=1 Spilled Records=2 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=12 Total committed heap usage (bytes)=629145600 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=27 2024-03-11 15:46:00,179 INFO streaming.StreamJob: Output directory: my_output
If the job executed successfully, an empty file named _SUCCESS
is expected to be present in the output directory my_output
.
Verify the success of the MapReduce job by checking for the presence of the _SUCCESS
file.
%%bash
echo "Check if MapReduce job was successful"
hdfs dfs -test -e my_output/_SUCCESS
if [ $? -eq 0 ]; then
echo "_SUCCESS exists!"
fi
Check if MapReduce job was successful _SUCCESS exists!
Note: hdfs dfs -ls
is the same as ls
since the default filesystem is the local filesystem.
!hdfs dfs -ls my_output
Found 2 items -rw-r--r-- 1 runner docker 0 2024-03-11 15:45 my_output/_SUCCESS -rw-r--r-- 1 runner docker 15 2024-03-11 15:45 my_output/part-00000
!ls -l my_output
total 4 -rw-r--r-- 1 runner docker 0 Mar 11 15:45 _SUCCESS -rw-r--r-- 1 runner docker 15 Mar 11 15:45 part-00000
The actual output of the MapReduce job is contained in the file part-00000
in the output directory.
!cat my_output/part-00000
Hello, World!
In the previous example, we have seen how to run a MapReduce job without specifying any reducer.
Since the only required options for mapred streaming
are input
and output
, we can also run a MapReduce job without specifying a mapper.
!mapred streaming -h
2024-03-11 15:46:03,162 ERROR streaming.StreamJob: Unrecognized option: -h Usage: $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar [options] Options: -input <path> DFS input file(s) for the Map step. -output <path> DFS output directory for the Reduce step. -mapper <cmd|JavaClassName> Optional. Command to be run as mapper. -combiner <cmd|JavaClassName> Optional. Command to be run as combiner. -reducer <cmd|JavaClassName> Optional. Command to be run as reducer. -file <file> Optional. File/dir to be shipped in the Job jar file. Deprecated. Use generic option "-files" instead. -inputformat <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName> Optional. The input format class. -outputformat <TextOutputFormat(default)|JavaClassName> Optional. The output format class. -partitioner <JavaClassName> Optional. The partitioner class. -numReduceTasks <num> Optional. Number of reduce tasks. -inputreader <spec> Optional. Input recordreader spec. -cmdenv <n>=<v> Optional. Pass env.var to streaming commands. -mapdebug <cmd> Optional. To run this script when a map task fails. -reducedebug <cmd> Optional. To run this script when a reduce task fails. -io <identifier> Optional. Format to use for input to and output from mapper/reducer commands -lazyOutput Optional. Lazily create Output. -background Optional. Submit the job and don't wait till it completes. -verbose Optional. Print verbose output. -info Optional. Print detailed usage. -help Optional. Print help message. Generic options supported are: -conf <configuration file> specify an application configuration file -D <property=value> define a value for a given property -fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations. -jt <local|resourcemanager:port> specify a ResourceManager -files <file1,...> specify a comma-separated list of files to be copied to the map reduce cluster -libjars <jar1,...> specify a comma-separated list of jar files to be included in the classpath -archives <archive1,...> specify a comma-separated list of archives to be unarchived on the compute machines The general command line syntax is: command [genericOptions] [commandOptions] For more details about these options: Use $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar -info Try -help for more information Streaming Command Failed!
%%bash
hdfs dfs -rm -r my_output
mapred streaming \
-input hello.txt \
-output my_output
2024-03-11 15:46:04,240 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted my_output
2024-03-11 15:46:05,316 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties 2024-03-11 15:46:05,402 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 2024-03-11 15:46:05,402 INFO impl.MetricsSystemImpl: JobTracker metrics system started 2024-03-11 15:46:05,414 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:46:05,551 INFO mapred.FileInputFormat: Total input files to process : 1 2024-03-11 15:46:05,565 INFO mapreduce.JobSubmitter: number of splits:1 2024-03-11 15:46:05,710 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1336272806_0001 2024-03-11 15:46:05,710 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2024-03-11 15:46:05,839 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 2024-03-11 15:46:05,840 INFO mapreduce.Job: Running job: job_local1336272806_0001 2024-03-11 15:46:05,846 INFO mapred.LocalJobRunner: OutputCommitter set in config null 2024-03-11 15:46:05,848 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 2024-03-11 15:46:05,853 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:05,853 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:05,881 INFO mapred.LocalJobRunner: Waiting for map tasks 2024-03-11 15:46:05,885 INFO mapred.LocalJobRunner: Starting task: attempt_local1336272806_0001_m_000000_0 2024-03-11 15:46:05,904 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:05,904 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:05,920 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:46:05,926 INFO mapred.MapTask: Processing split: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:46:05,933 INFO mapred.MapTask: numReduceTasks: 1 2024-03-11 15:46:05,949 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 2024-03-11 15:46:05,949 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 2024-03-11 15:46:05,949 INFO mapred.MapTask: soft limit at 83886080 2024-03-11 15:46:05,949 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 2024-03-11 15:46:05,949 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 2024-03-11 15:46:05,953 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 2024-03-11 15:46:05,957 INFO mapred.LocalJobRunner: 2024-03-11 15:46:05,957 INFO mapred.MapTask: Starting flush of map output 2024-03-11 15:46:05,957 INFO mapred.MapTask: Spilling map output 2024-03-11 15:46:05,957 INFO mapred.MapTask: bufstart = 0; bufend = 22; bufvoid = 104857600 2024-03-11 15:46:05,957 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600 2024-03-11 15:46:05,962 INFO mapred.MapTask: Finished spill 0 2024-03-11 15:46:05,970 INFO mapred.Task: Task:attempt_local1336272806_0001_m_000000_0 is done. And is in the process of committing 2024-03-11 15:46:05,972 INFO mapred.LocalJobRunner: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:46:05,972 INFO mapred.Task: Task 'attempt_local1336272806_0001_m_000000_0' done. 2024-03-11 15:46:05,976 INFO mapred.Task: Final Counters for attempt_local1336272806_0001_m_000000_0: Counters: 17 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=782672 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Map output bytes=22 Map output materialized bytes=30 Input split bytes=102 Combine input records=0 Spilled Records=1 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=0 Total committed heap usage (bytes)=337641472 File Input Format Counters Bytes Read=14 2024-03-11 15:46:05,976 INFO mapred.LocalJobRunner: Finishing task: attempt_local1336272806_0001_m_000000_0 2024-03-11 15:46:05,977 INFO mapred.LocalJobRunner: map task executor complete. 2024-03-11 15:46:05,979 INFO mapred.LocalJobRunner: Waiting for reduce tasks 2024-03-11 15:46:05,980 INFO mapred.LocalJobRunner: Starting task: attempt_local1336272806_0001_r_000000_0 2024-03-11 15:46:05,990 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:05,990 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:05,990 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:46:06,001 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@6bc81e34 2024-03-11 15:46:06,007 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:46:06,021 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=2933076736, maxSingleShuffleLimit=733269184, mergeThreshold=1935830784, ioSortFactor=10, memToMemMergeOutputsThreshold=10 2024-03-11 15:46:06,022 INFO reduce.EventFetcher: attempt_local1336272806_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 2024-03-11 15:46:06,042 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1336272806_0001_m_000000_0 decomp: 26 len: 30 to MEMORY 2024-03-11 15:46:06,044 INFO reduce.InMemoryMapOutput: Read 26 bytes from map-output for attempt_local1336272806_0001_m_000000_0 2024-03-11 15:46:06,046 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 26, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->26 2024-03-11 15:46:06,048 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 2024-03-11 15:46:06,049 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:46:06,049 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2024-03-11 15:46:06,053 INFO mapred.Merger: Merging 1 sorted segments 2024-03-11 15:46:06,054 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 16 bytes 2024-03-11 15:46:06,056 INFO reduce.MergeManagerImpl: Merged 1 segments, 26 bytes to disk to satisfy reduce memory limit 2024-03-11 15:46:06,057 INFO reduce.MergeManagerImpl: Merging 1 files, 30 bytes from disk 2024-03-11 15:46:06,058 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 2024-03-11 15:46:06,058 INFO mapred.Merger: Merging 1 sorted segments 2024-03-11 15:46:06,060 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 16 bytes 2024-03-11 15:46:06,062 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:46:06,067 INFO mapred.Task: Task:attempt_local1336272806_0001_r_000000_0 is done. And is in the process of committing 2024-03-11 15:46:06,068 INFO mapred.LocalJobRunner: 1 / 1 copied. 2024-03-11 15:46:06,068 INFO mapred.Task: Task attempt_local1336272806_0001_r_000000_0 is allowed to commit now 2024-03-11 15:46:06,069 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1336272806_0001_r_000000_0' to file:/home/runner/work/big_data/big_data/my_output 2024-03-11 15:46:06,071 INFO mapred.LocalJobRunner: reduce > reduce 2024-03-11 15:46:06,071 INFO mapred.Task: Task 'attempt_local1336272806_0001_r_000000_0' done. 2024-03-11 15:46:06,072 INFO mapred.Task: Final Counters for attempt_local1336272806_0001_r_000000_0: Counters: 24 File System Counters FILE: Number of bytes read=141529 FILE: Number of bytes written=782730 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=30 Reduce input records=1 Reduce output records=1 Spilled Records=1 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=6 Total committed heap usage (bytes)=337641472 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Output Format Counters Bytes Written=28 2024-03-11 15:46:06,073 INFO mapred.LocalJobRunner: Finishing task: attempt_local1336272806_0001_r_000000_0 2024-03-11 15:46:06,073 INFO mapred.LocalJobRunner: reduce task executor complete. 2024-03-11 15:46:06,845 INFO mapreduce.Job: Job job_local1336272806_0001 running in uber mode : false 2024-03-11 15:46:06,846 INFO mapreduce.Job: map 100% reduce 100% 2024-03-11 15:46:06,847 INFO mapreduce.Job: Job job_local1336272806_0001 completed successfully 2024-03-11 15:46:06,852 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=282966 FILE: Number of bytes written=1565402 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Map output bytes=22 Map output materialized bytes=30 Input split bytes=102 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=30 Reduce input records=1 Reduce output records=1 Spilled Records=2 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=6 Total committed heap usage (bytes)=675282944 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=28 2024-03-11 15:46:06,852 INFO streaming.StreamJob: Output directory: my_output
%%bash
echo "Check if MapReduce job was successful"
hdfs dfs -test -e my_output/_SUCCESS
if [ $? -eq 0 ]; then
echo "_SUCCESS exists!"
fi
Check if MapReduce job was successful _SUCCESS exists!
Show output
!cat my_output/part-00000
0 Hello, World!
What happened here is that not having defined any mapper or reducer, the "Identity" mapper (IdentityMapper) and reducer (IdentityReducer) were used by default (see Streaming command options).
Not specifying mapper and reducer in the MapReduce job submission does not mean that MapReduce isn't going to run the mapper and reducer steps, it is simply going to use the Identity mapper and reducer.
To run a MapReduce job without reducer one needs to use the generic option
\-D mapreduce.job.reduces=0
(see specifying map-only jobs).
%%bash
hdfs dfs -rm -r my_output
mapred streaming \
-D mapreduce.job.reduces=0 \
-input hello.txt \
-output my_output
2024-03-11 15:46:08,861 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted my_output
2024-03-11 15:46:09,908 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties 2024-03-11 15:46:09,985 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 2024-03-11 15:46:09,986 INFO impl.MetricsSystemImpl: JobTracker metrics system started 2024-03-11 15:46:09,997 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:46:10,132 INFO mapred.FileInputFormat: Total input files to process : 1 2024-03-11 15:46:10,145 INFO mapreduce.JobSubmitter: number of splits:1 2024-03-11 15:46:10,293 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1539286825_0001 2024-03-11 15:46:10,293 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2024-03-11 15:46:10,409 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 2024-03-11 15:46:10,411 INFO mapreduce.Job: Running job: job_local1539286825_0001 2024-03-11 15:46:10,417 INFO mapred.LocalJobRunner: OutputCommitter set in config null 2024-03-11 15:46:10,420 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 2024-03-11 15:46:10,426 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:10,426 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:10,457 INFO mapred.LocalJobRunner: Waiting for map tasks 2024-03-11 15:46:10,460 INFO mapred.LocalJobRunner: Starting task: attempt_local1539286825_0001_m_000000_0 2024-03-11 15:46:10,477 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:10,478 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:10,497 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:46:10,504 INFO mapred.MapTask: Processing split: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:46:10,511 INFO mapred.MapTask: numReduceTasks: 0 2024-03-11 15:46:10,520 INFO mapred.LocalJobRunner: 2024-03-11 15:46:10,532 INFO mapred.Task: Task:attempt_local1539286825_0001_m_000000_0 is done. And is in the process of committing 2024-03-11 15:46:10,533 INFO mapred.LocalJobRunner: 2024-03-11 15:46:10,533 INFO mapred.Task: Task attempt_local1539286825_0001_m_000000_0 is allowed to commit now 2024-03-11 15:46:10,535 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1539286825_0001_m_000000_0' to file:/home/runner/work/big_data/big_data/my_output 2024-03-11 15:46:10,536 INFO mapred.LocalJobRunner: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:46:10,536 INFO mapred.Task: Task 'attempt_local1539286825_0001_m_000000_0' done. 2024-03-11 15:46:10,543 INFO mapred.Task: Final Counters for attempt_local1539286825_0001_m_000000_0: Counters: 15 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=782636 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Input split bytes=102 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=7 Total committed heap usage (bytes)=314572800 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=28 2024-03-11 15:46:10,543 INFO mapred.LocalJobRunner: Finishing task: attempt_local1539286825_0001_m_000000_0 2024-03-11 15:46:10,544 INFO mapred.LocalJobRunner: map task executor complete. 2024-03-11 15:46:11,420 INFO mapreduce.Job: Job job_local1539286825_0001 running in uber mode : false 2024-03-11 15:46:11,422 INFO mapreduce.Job: map 100% reduce 0% 2024-03-11 15:46:11,423 INFO mapreduce.Job: Job job_local1539286825_0001 completed successfully 2024-03-11 15:46:11,427 INFO mapreduce.Job: Counters: 15 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=782636 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Input split bytes=102 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=7 Total committed heap usage (bytes)=314572800 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=28 2024-03-11 15:46:11,427 INFO streaming.StreamJob: Output directory: my_output
!hdfs dfs -test -e my_output/_SUCCESS && cat my_output/part-00000
0 Hello, World!
The advantage of a map-only job is that the sorting and shuffling phases are skipped, so if you do not need that remember to specify -D mapreduce.job.reduces=0
.
On the other hand, a MapReduce job even with the default IdentityReducer
will deliver sorted results because the data passed from the mapper to the reducer always gets sorted.
Taking into account the previous considerations, here's a more efficient version of the 'Hello, World!' application that bypasses the shuffling and sorting step.
%%bash
hdfs dfs -rm -r my_output
mapred streaming \
-D mapreduce.job.reduces=0 \
-input hello.txt \
-output my_output \
-mapper '/bin/cat'
2024-03-11 15:46:13,342 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted my_output
2024-03-11 15:46:14,418 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties 2024-03-11 15:46:14,499 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 2024-03-11 15:46:14,500 INFO impl.MetricsSystemImpl: JobTracker metrics system started 2024-03-11 15:46:14,510 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized! 2024-03-11 15:46:14,625 INFO mapred.FileInputFormat: Total input files to process : 1 2024-03-11 15:46:14,639 INFO mapreduce.JobSubmitter: number of splits:1 2024-03-11 15:46:14,784 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1621052802_0001 2024-03-11 15:46:14,784 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2024-03-11 15:46:14,906 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 2024-03-11 15:46:14,910 INFO mapreduce.Job: Running job: job_local1621052802_0001 2024-03-11 15:46:14,916 INFO mapred.LocalJobRunner: OutputCommitter set in config null 2024-03-11 15:46:14,921 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 2024-03-11 15:46:14,927 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:14,927 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:14,958 INFO mapred.LocalJobRunner: Waiting for map tasks 2024-03-11 15:46:14,962 INFO mapred.LocalJobRunner: Starting task: attempt_local1621052802_0001_m_000000_0 2024-03-11 15:46:14,981 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2 2024-03-11 15:46:14,983 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false 2024-03-11 15:46:15,002 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2024-03-11 15:46:15,008 INFO mapred.MapTask: Processing split: file:/home/runner/work/big_data/big_data/hello.txt:0+14 2024-03-11 15:46:15,017 INFO mapred.MapTask: numReduceTasks: 0 2024-03-11 15:46:15,024 INFO streaming.PipeMapRed: PipeMapRed exec [/bin/cat] 2024-03-11 15:46:15,029 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir 2024-03-11 15:46:15,032 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir 2024-03-11 15:46:15,032 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file 2024-03-11 15:46:15,033 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length 2024-03-11 15:46:15,035 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 2024-03-11 15:46:15,036 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 2024-03-11 15:46:15,038 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start 2024-03-11 15:46:15,039 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 2024-03-11 15:46:15,040 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 2024-03-11 15:46:15,041 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 2024-03-11 15:46:15,042 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 2024-03-11 15:46:15,043 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name 2024-03-11 15:46:15,056 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 2024-03-11 15:46:15,058 INFO streaming.PipeMapRed: MRErrorThread done 2024-03-11 15:46:15,061 INFO streaming.PipeMapRed: Records R/W=1/1 2024-03-11 15:46:15,063 INFO streaming.PipeMapRed: mapRedFinished 2024-03-11 15:46:15,065 INFO mapred.LocalJobRunner: 2024-03-11 15:46:15,070 INFO mapred.Task: Task:attempt_local1621052802_0001_m_000000_0 is done. And is in the process of committing 2024-03-11 15:46:15,073 INFO mapred.LocalJobRunner: 2024-03-11 15:46:15,073 INFO mapred.Task: Task attempt_local1621052802_0001_m_000000_0 is allowed to commit now 2024-03-11 15:46:15,076 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1621052802_0001_m_000000_0' to file:/home/runner/work/big_data/big_data/my_output 2024-03-11 15:46:15,077 INFO mapred.LocalJobRunner: Records R/W=1/1 2024-03-11 15:46:15,078 INFO mapred.Task: Task 'attempt_local1621052802_0001_m_000000_0' done. 2024-03-11 15:46:15,084 INFO mapred.Task: Final Counters for attempt_local1621052802_0001_m_000000_0: Counters: 15 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=785612 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Input split bytes=102 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=0 Total committed heap usage (bytes)=314572800 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=27 2024-03-11 15:46:15,084 INFO mapred.LocalJobRunner: Finishing task: attempt_local1621052802_0001_m_000000_0 2024-03-11 15:46:15,085 INFO mapred.LocalJobRunner: map task executor complete. 2024-03-11 15:46:15,914 INFO mapreduce.Job: Job job_local1621052802_0001 running in uber mode : false 2024-03-11 15:46:15,916 INFO mapreduce.Job: map 100% reduce 0% 2024-03-11 15:46:15,918 INFO mapreduce.Job: Job job_local1621052802_0001 completed successfully 2024-03-11 15:46:15,922 INFO mapreduce.Job: Counters: 15 File System Counters FILE: Number of bytes read=141437 FILE: Number of bytes written=785612 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=1 Map output records=1 Input split bytes=102 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=0 Total committed heap usage (bytes)=314572800 File Input Format Counters Bytes Read=14 File Output Format Counters Bytes Written=27 2024-03-11 15:46:15,922 INFO streaming.StreamJob: Output directory: my_output
!hdfs dfs -test -e my_output/_SUCCESS && cat my_output/part-00000
Hello, World!