Chief among the challenges of operating large-scale MapReduce pipelines in production is managing the performance of the underlying distributed file system. When processing more than 30 petabytes of data a day, as we do here at Quantcast, file system performance is critical.
We’ve previously discussed a few things we’ve done to help ensure that our MapReduce jobs perform reliably with respect to I/O, including our development and use of the Quantcast File System (QFS). However, all distributed file systems are at some point limited by the amount of physical disk space available.
For this reason and many more, we take a careful approach to data retention at Quantcast. This post details our approach to this challenge and explains how we got there.
A Flawed Approach to Data Retention
In the early days of Quantcast (pre-2010) we solved this problem the way a lot of the other early big-data pioneers did — via extensive monitoring and alerting to identify when we reached pre-critical thresholds. A triggered alert meant someone in Operations had to identify the largest file directories and follow up with their owners. Not so great.
Unfortunately, the frequent manual deletes were fraught with fat finger mistakes and turns out it’s much easier than you might think to accidentally delete the root file system.
Teams were rolling their own one-off retention policy scripts with varying degrees of robustness, and as you might imagine, files were disappearing without a clear understanding of how, why, or when.Yikes! We knew we needed something better…
Envisioning a Smarter Method
We started by prototyping some automated retention policy scripts to get a sense of the problem scope. One day and more than a dozen special cases later, it was clear that we needed to build a file system excision tool for the long haul.
We needed a tool which would allow developers to define declarative retention policies for their own datasets, delegating execution to a well-tested, centralized mechanism.
The syntax for expressing retention policies needed to be declarative, easy to learn, easy to read and write by a mortal human, easy to generate by a machine, and the configuration files must support programmatic validation.
Our initial prototyping phase had shown us just how many different types of policies and datasets we would need, and we couldn’t simplify our problem by imposing arbitrary constraints on the naming conventions or rules making up these policies.
We had these requirements:

  • Rules needed to be composable.
  • A single policy needed be able to apply to multiple datasets at different levels within a subtree of the file system.
  • Files within a single directory needed to have different policies (e.g. dir/reach.20160501 vs. dir/indexes.20160501).
  • Date-based policies needed be able to derive their logical dates from different file system naming conventions, directory hierarchy patterns (e.g. foo/2016/06/08/16 vs. foo-201606081600 vs. foo-1465426974000), metadata properties of the file, and more.
  • Policies must be applicable to the same file synchronized across multiple file systems.
    • Note: This was for operational reasons. Not only were we running multiple distributed file system instances, we were also storing data in both HDFS and QFS and even needed retention policies on some local file systems.
  • A level of abstraction that would enable us to decouple the policy definition from the underlying storage engine such that policies could be written agnostic of file system type.
  • The ability to whitelist files which should never be deleted under any circumstances (even if another rule implied otherwise).

Phew! Now what?
A Domain-Driven Design

We started by defining an object model which expresses the relationship between the primary concepts in our domain:

  • PolicyHost – The URI of a resource on which to apply a set of policies. Note that this is a URI, not necessarily an HDFS NameNode or QFS MetaServer address.
  • RetentionPolicies – A collection of RetentionPolicy objects which apply to the same path relative to the PolicyHost base URI.
  • RetentionPolicy – A single policy defined by a PathHandler’s type, path and optional filter string.
  • PathHandler – A handler for a given directory hierarchy and an associated action to take on files violating the SelectionRule (see below). An optional filename filter can be used to discriminate between different types of files in that hierarchy. The PathHandler represents a specific “type” of file, usually defined by a naming convention, and an associated strategy for defining each file’s logical “age” and “vintage” (start and end times of data contained within this file). There are many different types of PathHandlers:
    • MTimePathHandler – Uses the file system-dependent file modification date to represent the “age” of the file.
    • RegexPathHandler – Uses a special DSL and regular expression to extract date information from the name and/or path of the file.
    • IntervalFile – Represents a Quantcast-specific naming convention containing a start and end time (interval) encoded in the filename.
    • TimeStampPathHandler – Extracts an Unix timestamp from the filename, representing the “age” of the file.
    • DateStampPathHandler – Similar to TimeStampPathHandler but extracts an ISO-8601 Date from the filename, representing the “age” of the file.
  • Action – An action (e.g. DeleteAction, ArchiveAction, etc.) to apply to the resources identified by the PathHandler which violate the retention rule.
  • RetentionRule – Selects objects to exclude from the configured Action (ie. if Action is the DeleteAction then these selected objects will not be deleted). There are many types of rules, all of which are composable:
  • SinceDate,SinceNDays,SinceNMonths – Retain files newer than some date, where date may be hard-coded or a symbolic reference to the age of some other resource.
  • SinceOffsetFromDate – Retain files newer than N units (minutes, hours, days, months, etc.) from some date.
  • BeforeDate – Retain files older than some date.
  • LatestN,OldestN – Retain the latest/oldest N files.
  • LargerThan,SmallerThan – Retain any file with a content size larger/smaller than some threshold in bytes.
  • UnionRule – Composite rule containing 2 or more child rules. Retain the union of all files retained by the child rules.
  • IntersectionRule – Composite rule containing 2 or more child rules. Retain the intersection of the files retained by the child rules.

This object model allowed us to describe sets of files which we want to retain and the implied complementary set of files which we want to delete. QFS, HDFS, and anything else for which an org.apache.hadoop.fs.FileSystem implementation exists are trivially supported but it’s possible to extend the model to include other resource endpoints such as relational databases, Cassandra, Zookeeper, and others.
Declaring Success

New library in hand, we set out to define a declarative policy file syntax which was flexible and precise, yet easy to write.
We were tempted to use a schema-less format like JSON or YAML, but we knew if the configuration syntax was too clumsy developers might reject the tool (regardless of usefulness).
Correctness was not something we were willing to compromise on, given the circumstances, so we selected XML for it’s strict typing and ability to be programmatically validated (via XML Schema and DTD). Near-universal developer toolkit support didn’t hurt either.
After heavy dogfooding, we settled on a document model.
The following is an example of a simple policy file:

  1. <policies>
  2.  <host uri=“qfs://”>
  3.    <!– keep everything newer than 5 days (based on the hdfs/qfs last modified times)//–>
  4.    <path path=“/Users” matchOnAbsolutePath=“true” filter=“/Users/[a-zA-Z_-]+/tmp/.*” purgeEmptyDirs=“true” action=“delete”>
  5.       <sinceNDays n=“5” />
  6.    </path>
  7.  </host>
  8.  <host uri=“qfs://”>
  9.    <!– keep everything newer than 2 days (based on the YYYYMMdd[-HHmmss] suffix of these files),  preserving the latest 5 files, regardless of how old they may be //–>
  10.    <datePath path=“/path/to/some/dataset” filter=“my_prefix.*” action=“delete”>
  11.      <any>
  12.        <sinceNDays n=“2” />
  13.        <latestN n=“5” />
  14.      </any>
  15.    </datePath>
  16.  </host>
  17.  <host uri=“file:/”>
  18.    <!– keep the latest 5 files (based on the (YYYY)(MM)(dd)(HH) regex capture groups), unless they are empty //–>
  19.    <regexPath path=“/var/spool/app/hourly” name=“(\d\d\d\d)(\d\d)(\d\d)(\d\d).dat” action=“delete”>
  20.      <all>
  21.        <latestN n=“5” />
  22.        <largerThan bytes=“1” />
  23.      </all>
  24.    </regexPath>
  25.  </host>
  26. </policies>

Our solution led to relatively quick and painless company-wide adoption, and the periodic emails shaming owners of abusive datasets disappeared. This tool really did offer people a solution to a problem they were experiencing on a regular basis! Teams began building up ever more comprehensive policy files on their own volition and order was restored.
Over time, a number of common patterns emerged, prompting the inclusion of some syntactic sugar to reduce the verbosity of the XML files. We implemented support for macros (simple ant-like variable interpolation along with the ability to define macros both inline as well as externally or on the command line) and re-usable rule definitions:

  1. <policies>
  2.  <!– define a reusable pattern, addressable by it’s property name //–>
  3.  <property name=‘my-pattern’ value=‘^somefile-(\d{4})-(\d{2})-(\d{2})-(\d{2})-(\d{2})-(\d{2})-[\-\w\d]+-[0-9a-fA-F]{32}\.gz-md5[0-9a-fA-F]{32}$’ />
  4.  <!– define a reusable rule, addressable by it’s id //–>
  5.  <defRule id=“latest6Since6M”>
  6.    <any>
  7.      <latestN n=“6” />
  8.      <sinceNDays n=“180” />
  9.    </any>
  10.  </defRule>
  11.  <host uri=“qfs://”>
  12.    <regexPath path=“/path/to/some/files” filter=“${my-pattern}” action=“delete”>
  13.        <rule refid=“latest6Since6M” />
  14.    </regexPath>
  15.    <regexPath path=“/other/path/to/some/files” filter=“${my-pattern}” action=“delete”>
  16.      <rule refid=“latest6Since6M” />
  17.    </regexPath>
  18.  </host>
  19. </policies>

Furthermore, we implemented structured logging to the system, meaning any policy action performed on a file within the purview of our retention policy system is tracked and easily accessible for review. So, if a file suddenly goes missing we could easily determine whether it was intentionally deleted by a retention policy and, if so, which policy was specifically responsible for the deletion and when.
Last Thoughts
In order for the hundreds of MapReduce jobs in Quantcast’s production pipelines to be able to continuously deliver data according to strict SLAs, we had to make efficient use of Quantcast’s QFS storage system.
A well designed system for implementing retention policies turned out to be essential for running systems at our scale. By investing in a universal retention policy system we have been able to retain more useful data, ease the operational burden of deploying new production pipelines, and help ensure that our file system will not become a processing bottleneck.