in Engineering

How I Taught the Old PostgreSQL Horse the New Trick of Running Analytics on 1,000 Cluster Nodes

On a Friday afternoon in April 2012, I was watching terabytes go by sitting at my desk across from Yerba Buena Gardens, when I saw Jim Kelly, our VP of R&D, walking towards me carrying two beers. We had had a number of conversations about large-scale analytics over the previous weeks. I could tell there was more than beer coming my way.

Big Data Equals Big Value – If You Can Make Sense of It

Quantcast has no shortage of data. Every day we ingest about 50 terabytes of new semi-structured data. More data is good. It makes our predictive models better. It gives us confidence in our research, development and operations, both on the engineering and business side. However, before we get any value out of it, 50 terabytes simply means more bits to take care of, index, process and organize. 50 terabytes a day means 4.5 petabytes a quarter, a scale at which making sense becomes a big problem. Big data is certain to cause process complexity to increase from its scale, but is not certain to be valuable, unless you are able to make sense of it timely and at a gigantic scale, either through machine learning or as a human. Jim was coming to talk about the “making sense to a human” part.

SQL Helps Change Big Data into Big Value

At Quantcast we all want to make sense of big data, but in very different ways. Sales wants to know our reach in a geographic location in Brazil. Client Services wants to double-check that campaign delivery matches the configuration. Business Intelligence and Product wants to find the next big business opportunity. Modeling Scientists want to validate model quality. Engineers want to research and validate assumptions on value distribution and invariants. One way to do all this is to write Java or C++ programs and run them over MapReduce. That will work technically for engineers, but it is out of reach to users outside Engineering. Most of these users came from other companies, where they had used SQL to mine data, and that’s what they’re used to. For non-engineers, SQL is great. It gives them direct access to data. Without SQL they need to work with an engineer A task that can be done with SQL in minutes may take a week of back and forth with an engineer. SQL is good for engineers as well. While I can write a MapReduce job in minutes, it’s tedious to write and deploy, and it’s so easy to make mistakes. SQL makes it much harder to make mistakes and is quicker to write, so it helps even engineers become more productive. These are all good things – except we did not have a way to run SQL over our petabytes of data.

SQL over MapReduce?

I was already an SQL convert, and so was Jim. We were not about to discuss whether it’s good to get SQL to run over our data, but how. “It would take several quarters to get SQL to run over our data,” I said. Jim’s face turned to a grimace. “Don’t say quarters. Is there any way you can get us access to data through SQL quickly?” I thought about it over several sips of an IPA. I like challenges but, seriously, SQL on petabytes, and quickly? Something has to give. I started to think critically about the shortest path to a somewhat usable solution. We had already been running MapReduce at petabyte scale, though we wrote Java code for every job. Could we write a generic Java MapReduce job that could execute any SQL query? Grouping and ordering can be layered over the MapReduce sorter mechanism. Joins against dimension tables can be implemented by broadcasting them to all the MapReduce workers. Joining fact tables could be layered on our join mechanism we had previously developed atop MapReduce. There was still the matter of SQL syntax, which is quite rich and hard to replicate in Java. And, to complicate matters, we also wanted to make it possible to run our Java code as User Defined Functions (UDFs), because our data is semi-structured, and thus needs to be parsed by our Java libraries. And there are some other issues that I decided we should just punt on. Neither of these issues seemed showstoppers, so I said, “okay.”

Old Horse, New Tricks

I came up with the fastest hack that allowed us to run SQL at scale on our data. It took five weeks of coding by myself. It supported almost arbitrary SQL expression syntax. It supported grouping, ordering and hash joins. (Later we added merge joins for fact tables.) Here is what came out.

It’s essentially a generic MapReduce that streamed input data to a PostgreSQL server running on the same machine. Each map/reduce process streamed its input into PostgreSQL and then executed a lightly modified version of the query, so that the execution of the complex SQL syntax gets delegated entirely to PostgreSQL. From a development perspective I wrote six modules:

  1. Query parser. A quick and dirty regular expression parser that only recognized anchors such as SELECT, FROM, GROUP BY and a few more. It was somewhat fragile, but it worked fine with care.
  2. Grouping and sorting translation to MapReduce grouping. This simply required translating SQL grouping and sorting expressions to our MapReduce sort key formulas.
  3. Schema translation. Our data is stored in a proprietary binary or text format described by Java classes and custom parsers. I used Java class reflection to infer the field names and types and bind them to the query. To infer the types and names of the SQL query output we execute a stripped-down version of the query on an empty table during the preflight routine on a local PostgreSQL server.
  4. Split and rewrite the query for the map/combine and reduce phases. Some parts of the query cannot be run in both phases. For instance, the map phase should run the GROUP BY part of the query in order to benefit from a combining effect, but it must not run the HAVING clause or it would produce incorrect results.
  5. Generic mapper and generic reducer. Every mapper inserts up to 4 MB of data into table T1 in the PostgreSQL instance on the same hardware host. It then continues to insert values, though into a different table T2, and at the same time starts executing the query on table T1, and emits the query result rows (possibly grouped and aggregated) to the distributed MapReduce sorter. This pipelining of insertion and execution continues until end of input. The reducer works similarly. Additionally, it ensures that the combiner query may get run repeatedly until all data for a reduce group fits in 4 MB. (If this is not possible the query gets aborted. It has not happened in practice.) Once data has been combined the final query is executed, including the HAVING clause. For most queries the input to the reducer already fits in 4 MB per group, so there is only one stage in the reducer.
  6. User defined functions. I used Java reflection to recognize and invoke arbitrary static functions on fields. The Java calls are made between the time when rows are read from a file to the point they get sent to PostgreSQL. This has some limitations but it worked in many cases. One example is extracting a particular value from a serialized protocol buffer passed as a string field, encoded base64.

Results

At the end of the five weeks the code was functional at scale. We did not have a PostgreSQL specialist on staff so we treated it as a black box. As we use it only as a block streaming processor, we set it up with a limited RAM budget and made sure it never had to touch a spin disk. We did not have to change our MapReduce implementation at all. Our cluster SQL was implemented as yet another MapReduce pipeline.

We tested it all on a job that ran over hundreds of terabytes on a 1,000-node cluster with only twenty terabytes of RAM. This ensured that we pushed data through all the pipes, and it made it through correctly and consistently. All data was read from the distributed file system and written back to it. It ran on our unmodified data and it produced data in our common format, ready for use by all our other tools. Jim was happy, and the IPA was well spent.

Today

We received great value from the five-week prototype. While there were certainly unpolished spots, such as syntax limitations and lack of custom aggregators, it bought us time, helped determine the business value by sifting through scores of petabytes, and helped us learn what our users wanted.

This initial prototype puts us well on our way to producing a full-fledged SQL-based analytics solution. There will be more about that soon, so stay tuned!

By Silvius Rus, Director of Big Data Platforms, and Jim Kelly, Vice President of R&D