Spill in the context of Mapreduce means writing data from memory to disk. Spilling occurs whenever there is not enough memory to fit all the mapper output during a sorting operation. The amount of memory available is set in mapreduce.task.io.sort.mb.
A record emitted from a map will be serialized into a buffer and metadata will be stored into accounting buffers. When either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.
The value of mapreduce.task.io.sort.mb should clearly be smaller than that mapreduce_map_memory_mb since the mapper needs memory not only for sorting.
PROPERTY | VALUE | DESCRIPTION |
mapreduce.task.io.sort.factor | 10 | The number of streams to merge at the same time while sorting files. That is, the number of sort heads to use during the merge sort on the reducer side. This determines the number of open file handles. Merging more files in parallel reduces merge sort iterations and improves run time by eliminating disk I/O. Note that merging more files in parallel uses more memory. If 'io.sort.factor' is set too high or the maximum JVM heap is set too low, excessive garbage collection will occur. The Hadoop default is 10, but Cloudera recommends a higher value. Will be part of generated client configuration. |
mapreduce.task.io.sort.mb | 100 | The total amount of memory buffer, in megabytes, to use while sorting files. Note that this memory comes out of the user JVM heap size (meaning total user JVM heap - this amount of memory = total user usable heap space. Note that Cloudera's default differs from Hadoop's default; Cloudera uses a bigger buffer by default because modern machines often have more RAM. The smallest value across all TaskTrackers will be part of generated client configuration. |
mapreduce.map.sort.spill.percent | 0.80 | The soft limit in either the buffer or record collection buffers. When this limit is reached, a thread will begin to spill the contents to disk in the background. Note that this does not imply any chunking of data to the spill. A value less than 0.5 is not recommended. The syntax is in decimal units; the default is 80% and is formatted 0.8. Will be part of generated client configuration. |
mapreduce_map_memory_mb | 512 | The amount of memory to request from the scheduler for each map task. |
Here are some excerpts from the logs of mapreduce jobs where spilling occurred.
2019-04-25 09:55:22,949 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 67 2019-04-25 09:55:22,950 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 239326784 kv 59831692(239326768) kvi 57688416(230753664) 2019-04-25 09:55:26,118 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output 2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 239326784; bufend = 151336447; bufvoid = 268435456 2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 59831692(239326768); kvend = 51255868(205023472); length = 8575825/16777216 2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 159926367 kvi 39981584(159926336) 2019-04-25 09:55:34,315 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 68 2019-04-25 09:55:34,315 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 159926367 kv 39981584(159926336) kvi 37838320(151353280) 2019-04-25 09:55:37,877 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output 2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 159926367; bufend = 71936148; bufvoid = 268435456 2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 39981584(159926336); kvend = 31405800(125623200); length = 8575785/16777216 2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 80526068 kvi 20131512(80526048) 2019-04-25 09:55:46,365 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 69
2020-04-17 09:40:14,513 INFO [main] org.apache.hadoop.mapred.MapTask: Record too large for in-memory buffer: 268435452 bytes 2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output 2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 180996067; bufvoid = 268435456 2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 67108860(268435440); kvend = 67108860(268435440); length = 1/16777216 2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 180996067 kvi 45249012(180996048) 2020-04-17 09:41:25,474 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 1036 2020-04-17 09:41:25,474 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 180996067 kv 45249012(180996048) kvi 45249012(180996048) 2020-04-17 09:41:25,690 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 67108860(268435440) 2020-04-17 09:41:25,690 INFO [main] org.apache.hadoop.mapred.MapTask: Record too large for in-memory buffer: 268435452 bytes 2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output 2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 180996067; bufvoid = 268435456 2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 67108860(268435440); kvend = 67108860(268435440); length = 1/16777216 2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 180996067 kvi 45249012(180996048) 2020-04-17 09:42:45,419 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 1038
Spilling reduces the performance of mapreduce jobs because of the overhead of writing data to disk that blocks the map thread, hence it should be as much as possible be avoided. Ideally, one should strive for at most one spill per mapper.
To prevent data spill one needs to make sure that io.sort.mb
is less than the size of the data emitted from the mapper. This can be achieved by:
mapreduce.task.io.sort.mb
(but making sure that enough memory remains available for the mapper task)io.sort.mb
. This can be achieved for instance by splitting the data more so as to increase the number of mappersBut maximizing parallelization might not be always the best choice because of the overhead required to start each single map task and also because this way of splitting the data doesn't take into account the HDFS block size. Since the hadoop file system saves files into blocks and distributes them across the datanodes in the cluster, it is always convenient to split the data into chunks that are a multiple of the cluster's HDFS block size.
The default Hadoop block size is defined as
dfs.block.size = 128MiB
so for instance a good choice for a file of size $1$GB (note: $1$GB = $953,674$MiB) would be $953,674/12800 = 74$.
See also: Hadoop Wiki: Partitioning your job into maps and reduces https://wiki.apache.org/hadoop/HowManyMapsAndReduces