Hadoop’s sluggish command line

We at Quantcast started using Hadoop’s file system, HDFS, in 2006. We’ve run its administration tool, hadoop fs, hundreds of thousands of times and were never happy with its perceptible lag when we hit the enter key. Stat a file, and there goes half a second. Run hadoop fs -dus, another second. Your grandmother might not understand how we could get worked up over half a second, but when building high-performance systems, half a second is a constant reminder that our system still has a little flab.

And even grandma will complain when she runs hadoop fs in a bash loop. She’ll be waiting seconds or minutes to get her directories checked, and…well…she’s not getting any younger.

The problem is that every single time you perform a file operation you start a new Java virtual machine (JVM). This takes hundreds of milliseconds and uses hundreds of milliseconds of CPU time as well. Clearly a waste of time and resources.

QFS: Far-flung storage, up-close responsiveness

In 2011 we switched entirely from HDFS to QFS, the open-source Quantcast File System. Reliability and performance went up dramatically. Yet although QFS is implemented in C++ and is lightning fast, all our command line interaction with it was still sluggish. We were leveraging QFS’s Hadoop compatibility and going through Hadoop’s command-line tools. In other words, we were still starting a fresh JVM every time we typed hadoop fs.

We recently completed development of a command line tool called (appropriately) qfs, a C++ rewrite of hadoop fs for QFS, which makes QFS much quicker than HDFS from the command line. We compared timings accessing QFS via the HDFS command-line tool and via the new QFS native tool. The stat test timed a single hadoop fs -stat command. The script test timed a simple script that used the -stat, -touchz and -ls commands to check ten directory paths, create missing ones and list them.

The differences are vast:

The new qfs tool speeds up operations by two orders of magnitude. It finishes almost before it starts.

Note that you can also use QFS+FUSE (Filesystem in Userspace) to access your files through known UNIX commands, such as cat, grep, ls or find. The FUSE code is entirely C++ and thus does not require a JVM, while HDFS+FUSE does.

If you’d like to use the qfs tool yourself, you can download it from github, build it and run it. What you can’t do is park this new sports car in your old garage. Although the QFS’s back end supports HDFS’s client, the reverse isn’t true. The high-performance qfs tool requires the high performance QFS file system. Here’s another reason to give it a try.

Written by Silvius Rus, Jim Kelly and Michael Ovsiannikov on behalf of the Big Data Storage Team at Quantcast

We’ve submitted a patch to the Ganglia Monitoring System project to add support for compressing a batch of metrics before sending them to the collector service. Since we collect around 140,000 metrics per second (almost 12 billion metrics per day), even a small reduction in size adds up. This, however, is no small reduction; this patch reduces the amount of data transmitted by up to 92%.

This blog post will discuss the nature of the change and why we needed it, and talk a little bit about the technical details.

And That’s The Way It Is

Ganglia operates by running a Ganglia monitor daemon (gmond) on each machine whose metrics you’re interested in collecting, and then running a Ganglia collector daemon (gmetad) to retrieve those metrics and persist them in a central location. The metrics are transmitted as a big XML tree:

GANGLIA_XML
|
- CLUSTER
  |
  - HOST
  | |
  | - METRIC
  | - METRIC
  | - METRIC
  |
  - HOST
  | |
  | - METRIC
  | - METRIC
  ...

This structure is easy to analyze and debug, but it’s not particularly space-efficient, especially when sending a lot of metrics over a long-distance link very frequently. Compressing the XML tree seemed like an easy improvement to make with a large potential benefit.

Motivation: WANs Are Expensive

Quantcast has datacenters located all over the world. Using Ganglia to collect metrics from these datacenters means sending a lot of traffic across WAN connections, which are usually pretty expensive (and often slow and high-latency). Adding the ability to compress these metrics before sending them reduces the amount of raw data we send over these links, which saves us money. Also, transmitting less data over a high-latency link gets the metrics into the collector daemon that much faster, which improves our capability to monitor our global infrastructure in real time.

Technical Details

The patch has two main components: changing gmond to optionally gzip its metrics before sending them, and changing gmetad to automatically detect a compressed stream and decompress it before processing.

gmond

We added an option to gmond (-z) that causes it to compress its XML tree with gzip before emitting it. Since gmond was already using APR, we implemented this change by storing the compressed data in the per-socket data structure APR provides. This enabled us to make a very clean patch that makes as few changes as possible to the existing gmond code paths.

gmetad

We patched gmetad (both the C and the Python versions) to automatically detect a compressed stream. Upon receiving the data stream, gmetad looks for the gzip header bytes as defined in RFC 1952; if those bytes are detected, gmetad attempts to decompress the stream and then resumes processing as normal. This patch also involved very little change; if those bytes are not detected, gmetad behaves exactly as it did before.

Conclusion

We’ve made several other changes to Ganglia, and we’re planning to continue cleaning them up and submitting them back to the community. We’re hopeful that these changes will make Ganglia a more useful tool for everybody.

Did this post increase your interest in Quantcast by 92% as well? Come work with us and see what improvements you can make, both internally and shared with the world.

Posted by Adam Compton, Platform Operations Engineer

Tweet

We recently ran into a slightly tricky problem with our Puppet configuration (a loop in the directed acyclic graph in the catalog), and I’d like to talk a little bit about what the problem was and how we fixed it.

Quick Overview: Puppet and Graph Theory

Graph Theory?!

When Puppet compiles a manifest file (or files), it builds a data structure that relates each resource to every other resource it either depends on or is needed by. This kind of data structure is called a “directed acyclic graph” (DAG). Puppet uses this structure to decide where to begin applying changes and in what order to apply them; this is how it can explicitly order even complex dependency trees (install a package, edit these three files, then create this directory, and once all of those things are done, start this service).

The “Acyclic” Part of “Directed Acyclic Graph”

One of the primary reasons Puppet goes to all this trouble is to make sure that there are no loops (or “cycles”) in the DAG. A loop is created when a resource both is depended on and depends on some other resource (no matter how indirectly), and it means Puppet has no idea where to start applying changes. If resource A depends on resource B, which depends on resource A, then neither ordering is correct (doing either A or B first violates the DAG), and in those cases Puppet will throw up its hands and refuse to take any action at all.

Graph Loops and You

For a simple example of what happens when Puppet encounters a graph loop, the following manifest:

    file { '/dir1':
      ensure => directory
    }
    file { '/dir1/file1':
      require => File['/dir1']
    }

creates the following data structure

    File[/dir1] -> File[/dir1/file1]

In this example, Puppet will make sure to create a dir1 as a directory before it creates file1 inside of it.

If we added the following resource:

    file { '/file2':
      notify => File['/dir1'],
      require => File['/dir1/file1']
    }

a loop will be created like the following:

    File[/dir1] -> File[/dir1/file1] -> File[/file2] ~> File[/dir1]

This causes Puppet’s hair to catch on fire, and it immediately complains:

    $ puppet apply --noop test.pp
    Error: Could not apply complete catalog: Found 1 dependency cycle:
    (File[/dir1/file1] => File[/file2] => File[/dir1] => File[/dir1/file1])
    Try the --graph option and opening the resulting .dot file in OmniGraffle or GraphViz

Be Vewwy Vewwy Quiet, We’re Hunting Graph Loops

In the simple example above, it’s pretty easy to see where the graph loop is and how to resolve it. However, once you start adding custom resources, virtual resources, classes included based on Facter facts, etc., it can get tricky real fast. For instance, here’s a real error message we ran into recently after a change:

    err: Could not apply complete catalog: Found dependency cycles in the following relationships: File[/var/svc/nsca-cluster/log/run] => Service[nsca-cluster], File[/var/svc/nsca-cluster/run] => Service[nsca-cluster], File[/var/log/qc/nagios-cluster] => Service[nsca-cluster], File[/var/svc/nsca-cluster/log] => File[/var/svc/nsca-cluster/log/run], File[/var/svc/nsca-cluster/log/log.run] => File[/var/svc/nsca-cluster/log/run], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/log/run], File[/var/svc/nsca-cluster/log] => File[/var/svc/nsca-cluster/log/log.run], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/log/log.run], File[/var/svc/nsca-cluster/env] => File[/var/svc/nsca-cluster/env/RUNCONFIG], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/svc.run], File[/var/svc/nsca-cluster] => File[/var/svc/nsca-cluster/svc.run], File[/var/log/qc] => File[/var/log/qc/nsca-cluster], File[/var/log/qc/nagios-cluster] => File[/var/log/qc/nsca-cluster], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/env], File[/var/svc/nsca-cluster] => File[/var/svc/nsca-cluster/env], File[/var/svc/nsca-cluster/env] => File[/var/svc/nsca-cluster/env/USER], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/env/USER], File[/var/log/qc/nagios-cluster] => File[/var/log/qc], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/log], File[/var/svc/nsca-cluster] => File[/var/svc/nsca-cluster/log], File[/var/log/qc] => File[/var/log/qc/nagios-cluster], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster], File[/var/svc/nsca-cluster/env] => File[/var/svc/nsca-cluster/env/LOGDIR], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/env/LOGDIR], File[/var/svc/nsca-cluster/svc.run] => File[/var/svc/nsca-cluster/run], File[/var/log/qc/nagios-cluster] => File[/var/svc/nsca-cluster/run], File[/var/svc/nsca-cluster] => File[/var/svc/nsca-cluster/run]; try using the --graph option and open the .dot files in OmniGraffle or GraphViz

(It’s probably obvious that this configuration deals with our Nagios infrastructure.)

Looking through this log message, it definitely wasn’t immediately obvious to me where the problem was. It seemed like all of the services were depending on and also required by their log directories, but a close inspection of the manifests where these definitions are yielded no fruit.

The --graph Option, You Say?

Puppet supports creating GraphViz-format files that detail the DAG it builds while compiling the manifests. These files can be rendered in a variety of graphing tools to get a visual picture of the relationships between various resources. For example, here’s a rendering of this very configuration:


It turns out that for complicated configurations, looking at a rendering of the DAG is not necessarily helpful.

Actual Analysis (as opposed to squinting)

While researching this problem, I found a blog post that addressed it specifically. It includes a simple Python script that will analyze a GraphViz file and display just the graph loops to make them easier to find.

Running this script against our configuration gave the following output:

    $ ./dot_find_cycles.py expanded_relationships.dot
    [File[/var/log/qc/nagios-cluster], File[/var/log/qc], File[/var/log/qc/nagios-cluster]]

This showed me immediately that the problem was with the resource that sets up log directories; somehow, the creation of the root logdir had become dependent on the creation of one of its children.

Armed with that information, I was able to quickly track down a recent change to our custom service setup resource where I’d added a subscribe metaparameter that wound up eventually watching for itself and thus creating a loop. I reverted that change and everything was happy once more.

Lessons Learned

In this specific case, of course I learned that this custom resource didn’t work exactly as I thought it did. However, I also learned some valuable troubleshooting techniques for the next time we run into this problem. I’m going to try to figure out a way to incorporate the loop-finding script with our continuous integration system; whenever a change is made to the Puppet configuration, one of the tests it runs should be to check for this sort of loop and to reject the change if one is found.

Also, I learned exactly how long it takes my laptop to open a 19862×21525 pixel image (82 seconds).

Did this post make your heart race at the thought of spelunking through a maze of twisty little Puppet manifests (all alike)? Come work with us and see how it’s done!

Posted by Adam Compton, Platform Operations Engineer

Since Quantcast started in 2006, we’ve always had big data processing challenges relative to our budget and patience, so we pay a lot of attention to MapReduce performance and efficiency. One of the bottlenecks we ran into related to broadcasting job configuration data, so below we describe what we learned and how you can avoid this bottleneck in your own MapReduce jobs.

By job configuration data, we mean a blob of application-specific data that mappers or reducers need in order to process the main data set. For example, you might write a web log analysis that should skip a handful of URLs. Rather than hard-code the URLs in your map workers, the typical practice with Hadoop is to construct a list of URLs on your central launch node, write it to a file in the distributed file system, and have the map workers read it from there at startup before they start processing the web logs.

At small scale this works beautifully—your code stays clean, the job runs smoothly, and adding new URLs to the list is easy. But as your data volume grows, your configuration data will tend to grow as well, and you’ll want to use more map and reduce workers to process it. By the time you’re trying to send a 100 MB configuration file to thousands of workers, the broadcast will be a severe bottleneck, and jobs will take forever to start (if they start at all), while the cluster sits mostly idle.

Your Own Hadoop Botnet

What causes the slowdown? Your job becomes a victim of your computing power. With a few thousand Hadoop workers at your command, you can slice through enormous computing problems quickly. Or you can launch a potent denial-of-service attack on your file system. While you’re waiting for your job to start doing the first, it’s actually doing the second.

Here’s how that happens. Suppose you’re sending a 100 MB configuration file to 16,000 workers on 1,000 machines. When Hadoop writes the configuration file to its distributed file system (HDFS), it provisions ten replicas of it rather than the usual three to spread read requests across more data nodes. Multiple workers running on a single machine make only one request for the configuration file and share it among themselves, so each of 10 data nodes will have to serve 100 MB to 100 clients. On a gigabit network that takes 80 seconds, during which workers will sit idle waiting for data. That’s over a week of CPU wasted.

And that’s a best-case analysis. In fact the read requests will generally not be spread evenly. Hadoop will write the first three replicas synchronously, then start the workers, and then write the remaining seven. Many workers will start before all 10 replicas are available, so the first replicas will bear an even heavier load. Replication, then, is not the answer. The more replicas you make, the fewer machines each will have to serve, but the longer the replication process itself will take.

Options for Faster Broadcasts

The broadcast bottleneck became a big problem for us once our jobs and cluster got big enough. At Quantcast many production jobs have large configuration sets and do run on 1,000 machines, so we were seeing many slow (and even failing) jobs. We had tried increasing the replication factor beyond 10x, but that didn’t work well, as most workers still pulled from the first few replicas. We also considered building a completely different broadcast mechanism, perhaps using network broadcast protocols or self-establishing trees, but those sounded operationally complicated. They would leave us with yet another system to qualify, benchmark, deploy, log, audit, maintain, upgrade, monitor and debug.

After studying the problem ourselves and talking with HDFS’s developers, we realized there’s the most leverage in making the initial write spread its data to more places, but more efficiently than simple replication does. We identified a series of simpler improvements that do just that and deliver progressively faster broadcasts:

1. Decrease the block size in HDFS. If you turn it down to 1 MB, HDFS will write a 100 MB file as 100 physical files on many different data servers. Each data server will have to serve as many clients as before, but because the file size is much smaller, it will put much less stress on its network link. You can also turn down the replication count. In our performance test (Figure 1), we used 1 MB blocks and a replication count of three and got broadcasts to run 82% faster than default Hadoop.

2. Use the Quantcast File System’s (QFS) striping feature to achieve a similar data distribution and even better performance. For our broadcasts, we use QFS with 128-way striping and 2x replication, which spreads files across 256 machines and provides some resilience to data loss. This ran another 49% faster than #1.

3. Run QFS in RAM for even faster performance. Since the broadcast data won’t be needed after the last worker reads it, persisting it to disk isn’t necessary and slows the job down. 2x replication still provides fault tolerance, should a machine go down while the job is running. This ran yet another 44% faster than #2.

Try It Yourself

The good news is, this is a high-quality problem. If you’re noticing broadcast bottlenecks, your business is probably booming, your data processing infrastructure is up and running at scale, and you’re able to focus on improving efficiency. The even better news is we’re making it easy to improve. The HDFS change just requires a parameter adjustment, and QFS is easy to download, set up as an additional file system, and use for broadcasts.

Written by Silvius Rus, Jim Kelly and Michael Ovsiannikov on behalf of the Cluster R&D Team at Quantcast

In a previous post, we described Hadoop’s architectural limitations for large jobs of terabytes and beyond. The more tasks a job uses, the less efficient its disk I/O becomes. Fewer, larger tasks keep the I/O efficient but create operational and scheduling problems that get more painful the larger the job gets.

Quantcast developed an improved architecture called Quantsort and has been running it internally for three years. Here’s how it works.

Quantsort Design

Quantsort guarantees a large disk I/O size throughout all map-reduce stages, pushing disk I/O, the now constrained resource, to its limits. The number of map tasks does not directly affect the I/O size at any step of the map-reduce algorithm, so jobs can be broken down into tens or hundreds of thousands of short tasks. When a task fails, it takes seconds or minutes to redo it even for 100 TB sort jobs. Tasks fill up the available task slots nicely, and task preemption is unnecessary. With task duration of three minutes, on a cluster with 12,000 worker slots you get about 4000 free slots per minute that can be given to higher-priority jobs.

Quantsort’s algorithm is altogether different from Hadoop’s. Instead of creating sorted runs early and then shuffling them later, it shuffles map output immediately into about 1024 partition files (a fixed number), in a process we call fanout. Because this number is fixed, independent of the number of map or reduce tasks, it keeps the seeks required in the merge to a manageable number.

Fanout requires many map processes to write to the same file concurrently—often 10,000 or more on our large production jobs. It works thanks to the concurrent append feature of the QFS file system, which uses RAM buffering and multiple physical machines to ensure disk writes are large (1 MB or more) and efficient.

Fanout also leverages QFS’s replication feature. Because a large job will put the entire cluster to work for several hours collecting and sorting data, a disk failure affecting a fanout file is not unlikely and can be particularly expensive. Re-creating the file involves re-running thousands of map tasks. We therefore configure QFS to make a second copy of each fanout file, so the job is more likely to complete.

Separate sorter tasks then split each partition file into a run for each reduce task and sort the runs. The resulting runs, grouped by reduce task and sorted, are also significantly larger than 1 MB and can be read efficiently.

Quantsort Advantage

The following table compares the four main bottlenecks of a theoretical 10 TB uncompressed sort job, using the Hadoop and Quantsort algorithms. It assumes 10,000 tasks in both map and reduce phases and 2500 disk drives in the cluster and excludes the I/O associated with map input and reduce output.

Disk Seeks: Our hypothetical cluster has 2500 drives and can therefore execute 125,000 seeks per second. Quantsort needs 8 seconds; Hadoop needs 800 seconds.

Disk I/O: Our 2500 drives can deliver about 1 Tbps of disk I/O. Quantsort requires 55 TB of disk I/O: 20 TB for fanout (replication 2), 10 TB for sort input, 15 TB for sort output (encoded Reed-Solomon), and 10 TB for the merge (reading directly from QFS). Hadoop requires at least 60 TB of disk I/O: 10 TB for sort spills, 10 TB for merge input, 10 TB for merge output, 10 TB of shuffle input, 10 TB of shuffle output, and 10 TB of reduce-side merge input. When two passes are needed, an extra 20 TB is needed. 55 TB at 1 Tbps disk I/O takes 440 seconds. 60 TB takes 480 seconds and 80 TB takes 640 seconds.

Core switch I/O: Using commodity network hardware, disk I/O can be matched efficiently to core network 1:1 up to hundreds of 12-disk servers, so our hypothetical network can keep up with our 2500 hypothetical disks. Quantsort will load it more heavily than Hadoop. Assuming 1 Tbps core switch bandwidth, the Hadoop sort would fully load the network for 80 seconds, Quantsort for 184 seconds.

Task failure recovery time: This is perhaps the most critical differentiator. The fact that Quantsort failures take three minutes to recover leads to dramatic improvement in practice. It means jobs complete faster, pleasing users. And it saves the organization from preemptive scheduling and the lost capacity it implies, since cluster resources are constantly becoming available to the scheduler to service higher-priority jobs.

Quantsort in Practice

We would share a 20 TB sort benchmark comparing Quantsort to Hadoop on our production cluster, but our impatience got the better of us. One hour into our benchmark, Hadoop was just restarting some failed map tasks, and we ran out of dedicated experiment time. Quantsort had been finished for half an hour.

We can say Quantsort runs well. We’ve had it live for over three years, ever since its inaugural 100 TB sort in 2009. Today it sorts over a petabyte per day, including dozens of jobs of 10 TB or more. Its record on a production job is 971 TB, which took 6.4 hours.

Written by Silvius Rus, Jim Kelly, and Michael Ovsiannikov on behalf of the Cluster R&D Team at Quantcast.

Why is Hadoop Slow on Large Jobs?

Map-reduce is perhaps the most commonly used big data technology. Invented by Google, made popular by Hadoop, it takes an immensely complex parallel computing challenge and simplifies it down to writing a “map” function and a “reduce” function. No wonder it has caught on.

Between the “map” and the “reduce,” Hadoop does a great deal of heavy lifting. The output of the map must be collected, partitioned into groups, sorted, and delivered to the reduce processes that will handle it next. We call this the “sort” phase, and it’s of critical importance. As we’ll demonstrate, how efficiently the framework sorts in large part determines how much work a cluster can get done.

We at Quantcast have some familiarity with this topic. We were an early Hadoop adopter. We began using it six years ago and quickly bumped against its scalability limits. Sort operations of a terabyte often failed, and 100 TB sorts would always fail, as they kept hitting broken disks and restarting multi-hour map tasks. Hadoop has improved since then but still has difficulty scaling up to jobs of many terabytes.

Figure 1 shows how Hadoop’s sort algorithm works and why it struggles at scale. Each of N map tasks executes, collecting output into a RAM buffer. Every time the buffer fills, the map task sorts and partitions its contents into M partitions (one per reduce task) and writes them to a local spill file. After the map task finishes, it merges the spill files into a single file containing M runs of sorted data, each run destined for a single reduce task. During the shuffle phase, each reduce task retrieves its run of data from each merged map output file and merges the runs into a single file that will serve as the reduce input.

What’s not to like? Well, the more tasks you use, the less efficient this algorithm gets. Each of the N map output files contains M sorted runs, so the total map output ends up in N×M pieces on disk. More map or reduce tasks mean smaller pieces and less efficient reads. For example, suppose you run a job that produces 10 TB of map output, parallelized across 10,000 map tasks and 10,000 reduce tasks. The resulting merged map output files will contain a total of 100M runs, averaging 100 KB long. The reduce tasks will request each individually and in random order, causing about 100M disk seeks. Since a disk drive takes about 20 msec to seek and can read data at 50 MB/s, the shuffle phase will spend 23 disk-days seeking and only 2.3 disk-days reading—wasting 91% of your cluster’s theoretical I/O. Good thing you didn’t use twice as many tasks, because that would waste 97.6%.

“Aha,” you say, “I will instead use fewer tasks, each processing more data, and thereby keep disk I/O up.” You are wise to do so, but you may find long tasks difficult to live with. The task is Hadoop’s unit of “retryability;” longer ones are more likely to fail and are more painful to restart from the beginning when they do. Ten thousand map tasks reading 10 TB of data will mean tasks typically run in about five minutes. Using only 1000 mappers will reduce waste to 9% but will mean tasks will run an hour or two. If a task fails toward the end of the job and gets re-run, you’ll have to wait an extra hour or two for your job to finish, rather than an extra five minutes.

It gets worse, because the task is also Hadoop’s unit of time-sharing. While cluster nodes are head-down running your long tasks, they don’t request new instructions from the scheduler and can’t be interrupted for higher-priority work. Your colleagues needing to run shorter, more time-critical jobs will grow impatient waiting hours for cluster time. They will agitate for scheduling preemption, which will either prevent your long tasks from starting (even on an idle cluster) or kill them midstream—either way delaying you and wasting the cluster’s compute capacity. Long tasks alleviate Hadoop’s scalability bottleneck but create fresh problems in the areas of scheduling, operations, and office politics.

The Times Have Changed: Networks Are Faster; Disks Are Not

Hadoop’s inefficiency used to matter less. The core map-reduce algorithm was designed ten years ago, when network core switches (i.e., switches between racks in a cluster) could not take more than 10% of the disk aggregate throughput to start with. There was little point optimizing disk I/O further, since networks imposed a tighter bottleneck.

Since then, the bottleneck has shifted. Networks have become much faster and cheaper, while disk speed has not increased. With 10 Gbps networks, multi-hundred Gbps core switches, and 12-disk servers, core networks can now keep up with disk I/O, so improving disk I/O can pay off in better system performance.

Hadoop’s sort algorithm is therefore showing its age. It’s missing optimization opportunities afforded by modern networks, underusing disks, and therefore struggling to process large jobs gracefully. While large Hadoop benchmark runs have been performed, they usually run with custom settings that use fewer, larger tasks in order to produce reasonably large disk I/O operations. This is a sensible workaround for a benchmark but is impractical for regular operations.

Quantcast’s Solution

At Quantcast, we run our own stack for map-reduce processing based on Hadoop but leveraging custom-built technologies as well. An important one is our improved sort architecture called Quantsort. It interacts closely with the Quantcast File System to permit larger, better-behaved jobs. Stay tuned for a future blog post describing it.

Written by Silvius Rus, Jim Kelly, and Michael Ovsiannikov on behalf of the Cluster R&D Team at Quantcast.

Quantcast was processing big data before big data was cool. Behind our audience measurement and ad delivery systems lies a mountain of data, and over the years we’ve developed a vast trove of technology and expertise for handling it.

So we’re excited to announce that we’re sharing an important piece of that technology with the community. As of today, the Quantcast File System (QFS) is available to other Hadoop users (and the world in general) as an open source project.

Does the world need another file system? Yes, it does. If you’re using Hadoop’s HDFS, you’re probably storing three copies of all your data for fault tolerance. And you’re buying extra disk drives to store those copies, and servers to hold the drives, and racks for the servers, and power for the racks, and cooling to counterbalance the power. If you process enough data, those costs can total five, six, or even seven figures per month.

QFS can help. Rather than triple replication, it uses Reed-Solomon encoding, the same error-correction technique used since the 1980s in many technologies including CDs, DVDs, DSL, and more recently Mars rovers. Reed-Solomon provides even better fault tolerance, at a cost of only 50% additional storage space. In other words, where HDFS needs 3x the disk space, QFS needs only 1.5x. It halves your costs for disks and everything else needed to keep them spinning.

Did we mention QFS makes Hadoop jobs run faster, too? It does. Writing goes faster because jobs have to write only half as much physical data. Reading goes faster because QFS does all reads in parallel across multiple drives, making better use of drives that would otherwise be idle.

Finally, it really works. It has been live at Quantcast for four years while we’ve been steadily improving it. A year ago we went all in and switched all our map-reduce processing to use it. Four exabytes of I/O later, we’re confident it’s solid and ready for other organizations’ mission-critical workloads.

We invite you to read more about it, try it out, and perhaps join us in its ongoing development. It’s available for download here.

Posted by Jim Kelly, Vice President of Research & Development

Turbo Mapreduce

(Or How to Run Sawzall on 1 Terabyte of Text in 7 Seconds)

By Silvius Rus, on behalf of the Cluster Team at Quantcast

A few weeks ago I was talking to our CTO.  I was looking for wisdom on how to convince Java users to start programming in a new language.  We had just ported Sawzall, a highly productive mapreduce language from Google, to run on our proprietary Mapreduce cluster based loosely on Hadoop.  Even though Sawzall gives a clear productivity boost, old habits take extra convincing to change.  The advice I got was “make it run in seconds rather than minutes and everyone will jump at it”.

Being able to run Sawzall in seconds on large data sets can certainly be a game changer.  Instead of using it to run batches of jobs, it can be run interactively to “find the needle in the haystack”.  It can give engineers, data modelers and business analysts a handle on petabytes of data, quickly.  It’s empowering.  It makes you feel like you have the data in front of you.  Once you have it, it’s hard to remember how you lived without it.

Ingredients

  1. Sawzall language and runtime (open-sourced by Google). Expressive, productive.
  2. Proprietary distributed file system. As fast as (the hardware) can be.
  3. Simplified mapreduce implementation. Unnervingly simple and efficient, unorthodox.
  4. Tornado web server (open-sourced by Facebook). Scalable, easy to use.
  5. New HTTP based job management mechanism. Lightweight as it should.

Preparation

The first thing you need in order to run Sawzall programs is, well, a Sawzall implementation.  We picked up Google’s open source implementation.  Next you need to get Sawzall to read and write your data.  At Quantcast we store data in a proprietary format.  We decided to convert it on the fly to protocol buffers, feed it to Sawzall, and then translate it back from Sawzall output to Quantcast’s format.

We got Sawzall to run on our Mapreduce cluster but job latency on medium data sets was high, as the whole system was optimized only for overall cluster throughput.  It was taking minutes, not seconds, to run a Mapreduce on 1 TB of text starting from disk.  It does not make sense to cache data in RAM because users select this input set from a much larger data repository and we don’t know the selection before the job is launched.

Performance analysis showed the slowness came from starting many tasks at once, from task stragglers caused by disk or network bottlenecks, and from the separation of the map-sort-reduce phases.  We could spend months on a thorough solution to these problems but we’re a nimble startup, so we set out to do it in weeks.  My bet was that we implement quickly a Sawzall Mapreduce implementation that is extremely fast at the cost of absolute accuracy and repeatability.

First we had to get the user buy-in.  My promise was that we could speed it up 10x by dropping the 10% slowest work units.  Users believed this would be great for research, prototyping and debugging, all of which take a large part of the development cycle. Essentially, it’s like “grep” or “find” to Unix programmers.

 

The Main Course

The 1st design decision was to drop stragglers instead of dealing with them.  This simplifies design greatly. No worries about the long latency of a distributed file system transient failure, or an overcommitted CPU, a thrashing OS or a nuked network link. Drop, drop, drop.

The 2nd design decision was to drop the sort phase of mapreduce and to run the reducer concurrently with the mapper.  We keep a single reducer per Mapreduce job.  The reducer keeps all the reduce groups in a RAM hash table. It uses at most 1 GB-ish of RAM, which is usually enough as Sawzall is very good at combining records within the mapper.  Mappers are very simple.  Each mapper launches a given Sawzall program on a given file and sends Sawzall table output to the reducer.

The 3rd design decision was to rewrite the job management mechanism so it can start many tasks in a second.  We built a single master on top of the Tornado single threaded asynchronous HTTP server. The workers themselves, one per core, are babysat locally by daemontools.  They open long-lived HTTP connections to the master and wait for work. The master holds all these connections until a job gets posted by a user (over HTTP). Once it gets the job input list, it immediately replies to all the waiting clients.  It does so within a second, and uses less than a second of CPU during the execution of a Sawzall query on 1 TB of text.

As with all good recipes, there are secret ingredients.  More about them in a future post.


 

Implementation Notes

1st week: Code furiously. Learn as I go. Get a Sawzall mapreduce to run on a single machine.  Add nice features like optional combiner spills on SIGALRM, or early exit for “find first” jobs.  Exhilarating.  Adrenaline rush.  Feels like success is near.

2nd week: Realize that most engineers don’t really care for results in Sawzall format. Write a binary record converter. Technicalities sneak in. The Quantcast file writer is in Java. Should I throw a JVM in the mix? Bit the bullet and rewrote it in C++, so Sawzall results can be written directly into Quantcast files. This is starting to look good, but now users heard rumors and would like to try it out. Ouch, it needs a job management system. We have Hadoop, but Hadoop seems to prefer minutes to seconds and right now I only care for seconds. What to do, what to do?

3rd week: Write a job management mechanism that holds its own at scale. Tornado saves the day.  Impressive.  Try out a reasonably large end-to-end test (1 TB uncompressed). Nervousness is warranted. The largest I tried in early tests was 100 GB.  Now I’m running on a reasonably large data set and there are other unrelated jobs, some larger than mine, running at the same time, competing for disk, network and CPU as well.  Eyes looking over my shoulders, mental drum roll.  It’s short and sweet: it simply works and completes in 7 seconds. Prompt to prompt, from clicking “go” to having results on the local machine in a Quantcast format file.

Draw positive expletives from colleagues. Priceless.

The Not So Small Print

All this would not have been possible without the Cluster Team’s earlier experience of getting Sawzall to run over Hadoop.  That took longer than three weeks and it’s worth a blog entry by itself.

All this would not have been possible without Quantcast’s amazing file system, which didn’t have to be tweaked in any way for this new development. It just worked well.

All this would not have been possible without Quantcast’s Ops engineers, who produced a sound and scalable deployment mechanism so I could push the system to production the very hour I finished development.

Take Away

In case the diagram above isn’t clear, let’s spell it out.  We built a lightweight, scalable, fast though approximate Sawzall Mapreduce implementation. It ran a simple Sawzall program on 1 TB of text in 7 seconds starting from disk and it doesn’t rely on any prior setup (no Hadoop required).

Do you think you can beat the 7 seconds mark?  Bring it on!  We’re looking to hire bright cluster engineers to push the state of the art in large scale computing on commodity hardware.