Reliably producing fresh batch data is a key component to many products. Having fresher data allows downstream processes consuming it to respond more quickly to changes. Reliable delivery lets consumers depend on that data and builds a good reputation with customers. The few times data arrives late are the few times customers will remember.
In order for a batch job to finish as soon as possible, it needs to start as soon as its input data is available. Starting jobs sooner reduces idle compute time and delivers fresher data. This can help produce a better product without having to spend more money on computing.
At Quantcast, our data processing is organized into large, constantly changing, interconnected pipelines composed of hundreds of MapReduce jobs written by developers all over the company. In total we process tens of petabytes per day by running thousands of instances of MapReduce jobs. Ensuring each job reliably starts on schedule is a hard task.
At first we made a job dependency tree that would run all child jobs after a parent job finished. However, we often found that the dependencies didn’t always resemble a tree, and figuring out what job produced the file you wanted to read seemed like an unnecessary and error-prone step.
A better system is to declare which datasets certain jobs require and run the jobs when required datasets are updated. The configuration is straightforward since jobs already need to know their inputs. If jobs change over time, they should keep working as long as they read and write the same datasets. This helps us reliably deliver computed data as soon as possible while enabling engineers to develop products.
We use the Hive metastore to organize our datasets and dispatch notifications. Hive structures datasets as tables. We add new data to a table by adding a partition. We partition our tables by time intervals, but Hive can partition by all kinds of fields. Hive has a little-known mechanism to send notifications to an ActiveMQ topic when partitions are added or dropped, which we use to start our jobs.
Configuring Hive Notifications
Setting up notifications from Hive is not very well documented. We had to read some of the source code to figure out what configuration was needed and where to put it. Assuming a vanilla hive installation in $HIVE_HOME, set up these configuration files with at least these properties. The official documentation mentions putting ActiveMQ credentials in the hive-site.xml, but they really have to go in jndi.properties.
|java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactoryjava.naming.provider.url = tcp://activemq-host:61616connection.ConnectionFactory.userName = hiveuserconnection.ConnectionFactory.password = PASSWORD|
Then you need to symlink the jars needed for the NotificationListener class, since they aren’t in the classpath by default.
|ln -s $HIVE_HOME/hcatalog/share/hcatalog/*.jar $HIVE_HOME/lib|
Then add the ActiveMQ jar to the Hive classpath.
|cp $APACHE_ACTIVEMQ/activemq-all-5.11.1.jar $HIVE_HOME/lib|
For ActiveMQ, you need to give the hiveuser the admin and write permissions so that it can create topics as new tables are created and publish messages as partitions are added. We used the stomp protocol to connect to ActiveMQ from our Job Launcher. In order to ensure the stomp client had a healthy connection, we enabled heartbeating. However, ActiveMQ requires heartbeats to arrive exactly on time or it will terminate the connection. This kills healthy connections too aggressively, but you can set the transport.hbGracePeriodMultiplier on the stomp transportConnector to prevent ActiveMQ from closing connections when a heartbeat is a little late.
Once Hive is set up to send notifications, new tables will do it automatically. However, existing tables will need to have a property set on them. Using HiveMetastoreClient, you can call putToParameters(“hcat.msgbus.topic.name”, TOPIC_NAME) on an existing table to set its topic.
The Job Launcher
We made a Job Launcher that listens to messages from ActiveMQ and uses a set of rules to determine what jobs to launch. The basic rule is a cron expression that specifies what time intervals of a dataset should trigger a run. For instance, running a job after every 2 hours of a dataset are available could be expressed as “0 */2 * * *”. These basic rules can be combined to run a job if combinations of datasets are available (e.g. Run a job when at least one dataset becomes available or run a job when all of the datasets become available).
Missing a message from ActiveMQ can be serious since it could halt a pipeline for as much as a day before our monitoring and alerting would start complaining. To improve reliability, we used Stomp version 1.2, which allows us to create reliable subscriptions to topics. If the Job Launcher disconnects, ActiveMQ will detect either a closed socket or a lack of heartbeat. Then it will store any current and future unacknowledged messages until the Job Launcher reconnects. This guarantees at-least-once delivery, which the Job Launcher can then deduplicate.
In summary, using Hive metastore notifications helps to reliably run jobs faster and configure jobs in a more manageable way.
Posted by Eric Culp, Software Engineer.