Why is Hadoop Slow on Large Jobs?
Map-reduce is perhaps the most commonly used big data technology. Invented by Google, made popular by Hadoop, it takes an immensely complex parallel computing challenge and simplifies it down to writing a “map” function and a “reduce” function. No wonder it has caught on.
Between the “map” and the “reduce,” Hadoop does a great deal of heavy lifting. The output of the map must be collected, partitioned into groups, sorted, and delivered to the reduce processes that will handle it next. We call this the “sort” phase, and it’s of critical importance. As we’ll demonstrate, how efficiently the framework sorts in large part determines how much work a cluster can get done.
We at Quantcast have some familiarity with this topic. We were an early Hadoop adopter. We began using it six years ago and quickly bumped against its scalability limits. Sort operations of a terabyte often failed, and 100 TB sorts would always fail, as they kept hitting broken disks and restarting multi-hour map tasks. Hadoop has improved since then but still has difficulty scaling up to jobs of many terabytes.
Figure 1 shows how Hadoop’s sort algorithm works and why it struggles at scale. Each of N map tasks executes, collecting output into a RAM buffer. Every time the buffer fills, the map task sorts and partitions its contents into M partitions (one per reduce task) and writes them to a local spill file. After the map task finishes, it merges the spill files into a single file containing M runs of sorted data, each run destined for a single reduce task. During the shuffle phase, each reduce task retrieves its run of data from each merged map output file and merges the runs into a single file that will serve as the reduce input.
What’s not to like? Well, the more tasks you use, the less efficient this algorithm gets. Each of the N map output files contains M sorted runs, so the total map output ends up in N×M pieces on disk. More map or reduce tasks mean smaller pieces and less efficient reads. For example, suppose you run a job that produces 10 TB of map output, parallelized across 10,000 map tasks and 10,000 reduce tasks. The resulting merged map output files will contain a total of 100M runs, averaging 100 KB long. The reduce tasks will request each individually and in random order, causing about 100M disk seeks. Since a disk drive takes about 20 msec to seek and can read data at 50 MB/s, the shuffle phase will spend 23 disk-days seeking and only 2.3 disk-days reading—wasting 91% of your cluster’s theoretical I/O. Good thing you didn’t use twice as many tasks, because that would waste 97.6%.
“Aha,” you say, “I will instead use fewer tasks, each processing more data, and thereby keep disk I/O up.” You are wise to do so, but you may find long tasks difficult to live with. The task is Hadoop’s unit of “retryability;” longer ones are more likely to fail and are more painful to restart from the beginning when they do. Ten thousand map tasks reading 10 TB of data will mean tasks typically run in about five minutes. Using only 1000 mappers will reduce waste to 9% but will mean tasks will run an hour or two. If a task fails toward the end of the job and gets re-run, you’ll have to wait an extra hour or two for your job to finish, rather than an extra five minutes.
It gets worse, because the task is also Hadoop’s unit of time-sharing. While cluster nodes are head-down running your long tasks, they don’t request new instructions from the scheduler and can’t be interrupted for higher-priority work. Your colleagues needing to run shorter, more time-critical jobs will grow impatient waiting hours for cluster time. They will agitate for scheduling preemption, which will either prevent your long tasks from starting (even on an idle cluster) or kill them midstream—either way delaying you and wasting the cluster’s compute capacity. Long tasks alleviate Hadoop’s scalability bottleneck but create fresh problems in the areas of scheduling, operations, and office politics.
The Times Have Changed: Networks Are Faster; Disks Are Not
Hadoop’s inefficiency used to matter less. The core map-reduce algorithm was designed ten years ago, when network core switches (i.e., switches between racks in a cluster) could not take more than 10% of the disk aggregate throughput to start with. There was little point optimizing disk I/O further, since networks imposed a tighter bottleneck.
Since then, the bottleneck has shifted. Networks have become much faster and cheaper, while disk speed has not increased. With 10 Gbps networks, multi-hundred Gbps core switches, and 12-disk servers, core networks can now keep up with disk I/O, so improving disk I/O can pay off in better system performance.
Hadoop’s sort algorithm is therefore showing its age. It’s missing optimization opportunities afforded by modern networks, underusing disks, and therefore struggling to process large jobs gracefully. While large Hadoop benchmark runs have been performed, they usually run with custom settings that use fewer, larger tasks in order to produce reasonably large disk I/O operations. This is a sensible workaround for a benchmark but is impractical for regular operations.
At Quantcast, we run our own stack for map-reduce processing based on Hadoop but leveraging custom-built technologies as well. An important one is our improved sort architecture called Quantsort. It interacts closely with the Quantcast File System to permit larger, better-behaved jobs. Stay tuned for a future blog post describing it.
Written by Silvius Rus, Jim Kelly, and Michael Ovsiannikov on behalf of the Cluster R&D Team at Quantcast.