Batch Processing from the Ground Up: Part II

Sajeed Syed Bakht
15 min readOct 24, 2020

We last left off on Unix pipelines and how its philosophy can help us scale up batch processing on a distributed network. We then introduced that MapReduce could be a viable solution in a distributed network. Let’s dive into MapReduce.

Similarly to a single Unix Job, a MapReduce job takes one or more inputs and produces one or more outputs. Another similarity is that MapReduce does not modify the input and does not have any side effects when producing the output.

MapReduce reads and writes files on a distributed filesystem. This distributed filesystem could be a storage service such as Amazon S3, or in Hadoop’s implementation of MapReduce, Hadoop Distributed File System(HDFS). We’ll use HDFS as a running example throughout this series of blog posts.

HDFS is also based on a shared-nothing architecture, where all we need is computers connected by a conventional data center network. This helps us horizontally scale our applications.

HDFS is conceptually one big filesystem that can use space on all disks within the network. This is done by a daemon process running on each machine, allowing other nodes in the network to access files stored on the machine. A central server called a NameNode keeps track of which file blocks are stored on which machine. These file blocks are also usually replicated across different machines to tolerate machine and disk failures.

Example

Now let’s look back at our web server example from the Batch Processing from the Ground Up: Part I. How exactly does MapReduce return the top 5 most visited web pages? In our unix example, we mapped each line in our file to a record. Secondly, we extracted out only the web page. Thirdly, we sorted them by a webpage(alphabetically). Lastly, we iterated over the sorted webpages and counted them by unique webpages. Since the data was sorted the job did not need to maintain a lot of state data in memory.

These 4 steps can all be combined under 1 MapReduce job. Step 1 is handled by the input format parser. Step 2(map) and 4(reduce) is where you write your custom logic. And Step 3, the sort step, is implicit in MapReduce. This is because the output of the mapper is also sorted before given to the reducer.

Again, we have to create our custom logic of Step 2 and 4. Namely, the mapper function and the reducer function. Let’s dive into these concepts.

Mapper

A mapper is called once for every input record. It extracts the key and value from the input record. It can produce 0 or more key-value pairs. A mapper does not keep the state from one input record and the next, so each record is handled independently.

Reducer

The MapReduce framework takes all key-value pairs produced from the Mapper, collects all the values belonging to the same key, and sends it to the Reducer. The reducer can iterate through those values and produce an output.

The last step in our Unix example was another sort step which ordered URL by the highest number of visits. In a MapReduce job, we can make a second job that takes the input from the first job and sorts them by the highest number of visits.

In conclusion, we can think of the mapper as a way to prepare the data in such a way that it is suitable for sorting so that the reducer can process the sorted data.

Distributed Execution of Map Reduce

Since the mapper and reducer operate on one record at a time, it lends itself easily to be done in parallel, with multiple mappers and reducers. Therefore we distribute these tasks across multiple machines and effectively horizontally scale our computation. The mapper and reducer don’t need to worry about the input and output so the framework is in charge of moving data between machines.

Let’s attempt to understand how this works through an example. Examine the figure below.

So what exactly is going on here? Firstly, the data is partitioned potentially across multiple machines on HDFS. The input to a job is typically a directory within HDFS and each file or fileblock is considered a separate partition that can be handled by a separate map task. We can consider m1,m2, and m3 as separate file blocks. These separate file blocks are potentially placed on different machines. The MapReduce Scheduler attempts to run each map task on one of the machines that store the replica. This principle is known as putting the computation near the data.

As shown before, the mapper outputs key-value pairs. The next step is to distribute the reduce tasks. The reduce task is also partitioned potentially across multiple machines. An interesting part to note is that while the map task is determined by the number of input file blocks, the reducer instead is determined by the job author. To ensure all the key-value pairs end up on the same reducer partition, the framework uses a hash of the key to determine which reduce task should receive a particular key-value pair.

Also, remember that MapReduce also implicitly sorts the key-value pairs. It could prove infeasible to sort the data all in one go, so MapReduce can do this in stages. Each map task partitions the output data based on the hash key, and each of these partitions is written to a sorted file on the mapper’s disk similar to SS-Tables/LSM-Trees.

Once the sorted file is on the mapper’s disk the MapReduce Scheduler notifies the reducer to start fetching the output files from the mapper. The reducer connects to each of the mappers and downloads the appropriate key-value pairs. The process of partitioning by the reducer, sorting and copying data from a mapper to a reducer is known as a shuffle.

The reducer finally takes these key-value pairs from the shuffle and merges them together preserving the sort order. The output of the reducer is written to a file on the distributed file system.

MapReduce Workflows

One MapReduce job can solve a limited problem such as figuring out the number of views per web page. However, finding the top 5 most viewed web pages would require another MapReduce job since it requires a second sort. It is very common for MapReduce jobs to be chained together into workflows. These workflows are done implicitly, as the output of one MapReduce job is written to a certain directory, and that the next MapReduce job reads from that directory as input. From MapReduce’s framework, this is treated as two different jobs.

This is where the breakdown between the Unix approach and MapReduce occurs. Unix directly pipes the output of one process as the input to another, using a small in-memory buffer. However, chained MapReduce jobs work for as a sequence of commands, as the output of one job is written to a temporary file which is then read by the next command. This design choice has consequences that we will examine later in the blog series.

On our webpage example, the second job can only run once the first job has finished. To handle these dependencies between jobs, various scheduler tools have been built such as Airflow, Luigi, Azkaban, and Oozie. Various high-level tools for Hadoop such as Pig and Hive set up multiple MapReduce stages that are automatically wired together.

Reduce-Side Joins and Grouping

It’s very common for one record in a table to associated with another record in another table, such as a foreign key in a relational model, a document reference in a document model, or an edge in a graph model. These joins are helpful in many situations discussed previously in a previous article. Indexes are usually used to locate the records of interest. This avoids scanning over every record in our database. However, MapReduce does not have the usual concept of indexes.

When a MapReduce job is given a set of files as input, it reads through all the content within every file. This is operation is named a full table scan. This could prove outrageously inefficient when there is a need for a small number of records compared to an index lookup. However, in analytical queries, this could prove reasonable especially since this operation is parallelized across multiple machines.

When speaking about joins in the context of batch processing, it means resolving all occurrences of some association within a dataset. For example, we would assume a job is processing the data for all users simultaneously not merely a lookup for one user.

Example: analysis of user activity events

Let’s say we want to join our log of events with the user database as shown below.

The left table can be considered our facts and the right table can be considered one of the dimension tables. These joins can be useful in the context of correlating age groups and most visited web pages, as well as other useful statistics.

The simplest way to achieve this is to go to every event and query the user database in a remote server to join with the appropriate user. This is possible but will perform terribly as the processing throughput is limited by the round trip time to the database server, the effectiveness of the local cache will be determined by the distribution and the database might not be able to handle all the queries in parallel.

In order to achieve better throughput, the computation should attempt to be as local to one machine as possible. Making a request over the network for every record is too slow, and if the database changes then the process becomes non-deterministic.

A better approach would be to take a copy of the user database and put it in the same distributed filesystem as the log of user activity events. Then the user database will be in one set of files in HDFS and the user activity events will be in another set of files in HDFS. Then we can use MapReduce to bring together all the relevant records in the same place and process them efficiently.

Sort merge join

The mapper’s job is to extract a key and a value from each input record. In the preceding example, the key would be the userID. One set of mappers would go through the activity events, while the other set of mappers would go through the user database. Consider the figure below.

Also, recall that the MapReduce framework partitions the key-value pair by key and also sorts the output. The effectively places all activity events and user record with the same userID adjacent to one another as it becomes input for the reducer. The framework can even sort this in such a way that the user record is first and then followed by activity events. This is technique is known as a secondary sort.

The following join can be easily performed, a reducer function is called for each userID, such that the first value it reads is the database record value, in this case, the date of birth. The reducer stores the date of birth in a local variable and then iterates over the activity events that have the same userID, outputting pairs of viewed-url and viewer-age-in-years. Another MapReduce job can be used to calculate the distribution of viewer ages for each URL and cluster by age group.

Also since the reducer processes, all records of a particular userID in one go then it only needs to keep one user record in memory at any one time and doesn’t need to make any requests over the network. This algorithm is known as a sort-merge join since the output of the mapper is sorted by key and then the reducers merges together these sorted lists of records from both sides of the join.

Handling Skew

The idea of bringing together all records of the same key to the same place breaks down if there are keys that occur much more frequently than other keys. These hot-keys may be a very active social media member or a celebrity followed by millions of people. Since a MapReduce job is only complete once all it’s mappers and reducers complete, then all the subsequent MapReduce jobs will have to wait for it complete.

There are some solutions to hot keys such as the skewed join method in Pig that first runs a sampling job and identifies hot keys. When performing the actual join the mapper any records related to that hot key are sent to several reducers. This technique of spreading the work to other reducers helps it parallelize better at the cost of having to replicate the data to multiple reducers.

Map-Side Joins

The join algorithms in the last section perform the join operation on the reducer side thus they are known as reduce-side joins. The mapper’s role is to prepare the input data by extracting key-value pairs from each input record and assigning the key-value pair to a reducer partition and sorting by key.

The main advantage of this approach is that there are no assumptions about the input data. The mappers will effectively prepare the data for joining no matter the underlying property or structure. However, the downside of this is that sorting, copying to reducers, and merging reducer inputs can be expensive.

However, if we were able to make assumptions on the data then we can perform map-side joins. The key difference is that there are no reducers or sorting. Instead, each mapper simply reads one input file block from HDFS and writes one output file to the file system.

Broadcast Hash Joins

One of the simplest ways to perform a map-side join is a broadcast hash join. The main underlying assumption is that a large dataset needs to join with a small dataset. In particular, the small dataset needs to small enough so that it can be entirely loaded into memory.

Let’s assume that the user database is small enough to fit into memory. When the mapper starts up it can load the entire user database from a distributed filesystem to an in-memory hash table. Now, the mapper can scan through all the user activity events and simply lookup userID for each event in the hash table.

The smaller dataset can also be stored on a read-only index on the local disk. The frequently used parts of this index will be stored in the operating system's cache, thus providing the speed of an in-memory lookup without actually requiring it to fit in memory.

Partitioned Hash Join

A partition hash join is similar to a broadcast hash join but now data from user activity events and the user database is partitioned in the same way. For example, one can do modulo the user ID so that activity events and records within the user database with the same UserID show up in the same mapper. For example mapper 1 can have all the records relating to userId’s 1 through 10, while mapper 2 deals with userID’s 11 through 20. This effectively doesn’t require the whole user database to be broadcasted to every mapper. Instead, each mapper only requires a subset of the user database.

The Output of Batch WorkFlows

One important question that crosses through one’s mind is; what is the result of our batch processing job? The output of a batch processing job is not necessarily a report like an SQL query. Let’s look at one common use case of batch processing.

Key-Value stores as batch process output

One common use case of batch processing is building machine learning systems such as classifiers and recommendation engines. The output of these batch processing jobs is often some kind of database. For example; a database that can be queried by userID to find blog recommendations.

These databases need to be queried from a web application that can handle user requests. The goal is to place the output of our batch processing jobs in a database that can be queried from our web application.

A naive approach would use a client library for a database such as MySQL, to directly connect to our mappers and reducers. This output directly connects to our database server and writes to it one record at a time. This approach is a bad idea for the following reasons.

  1. Making a network request for every single record is orders of magnitude slower than the normal throughput of a batch process.
  2. MapReduce jobs run in parallel. This could overwhelm the database with a high number of requests.
  3. MapReduce normally retries failed output and produces no side effects. However, when writing to an external server, one would have to worry about partially completed jobs being visible to other systems.

A better solution would be to build a database inside the batch job and write it as files to the job’s output directory in the distributed filesystem. The files can then be loaded by bulk into servers that handle read-only queries. Key-value stores that support building database files in MapReduce include Voldemort, HBase bulk loading amongst others.

While loading data into Voldemort, the server continues serving requests to old data files while new data files are copied from the distributed filesystem to the server’s local disk. Once the copying the complete, Voldemort switches over to reading from the new data files. If anything goes wrong, then it can easily switch to the old data files since they are immutable and still there.

Philosophy of batch process outputs

The MapReduce philosophy of handling output data is similar to the Unix philosophy. Experimentation is encouraged since the input is unchanged, any previous output can be completely replaced by new output, and no side effects occur. Let’s see how this philosophy of treating the input as immutable and producing no side effects helps maintain your system.

  1. If a bug is introduced into the code which makes the output wrong or corrupted then you can easily roll back to a previous version and run the job again. This idea of being able to recover from buggy code has been called human fault tolerance.
  2. Since rolling back is easy, feature development can develop quickly as opposed to an environment where rollbacks are not so easy. This principle of minimizing irreversibility is beneficial to Agile Software.
  3. If a map or reduce task fails then the MapReduce framework automatically retries it with the same input. If it fails due to a buggy code then it will keep on retrying. But if the failure is in a transient manner then the fault is tolerated.
  4. The same set of files can be used as input to various different jobs.
  5. There is a separation of concern between wiring and logic. This separation allows potentially reusable code. One team can focus on the logic of the job and another team can focus on where and when that job will be run.

Comparing Hadoop to Distributed Databases

You might have heard of a Massively Parallel Processing(MPP) such as Teradata and NonStop SQL. All of the processing and parallel join algorithms discussed in the past few sections can be done by an MPP. Now, what are the differences between MPP databases and MapReduce? The differences arise from MapReduce’s diversity when it comes to storage and processing models, and also how each designs for faults.

Diversity of Storage

Hadoop can indiscriminately dump data into a distributed filesystem, while databases need to structure their data according to some structure such as relational or document. This difference has its tradeoffs. The careful modeling and upfront work may mean that the user has better quality data to work with. However, in many cases making the data available is much more important even if it may be a bit quirky to work with.

The advantage of indiscriminately dumping data can also be that different teams can work within different views than adhering to a specific data model. This principle is known as the sushi principle; raw data is better.

Diversity of processing models

MPP databases are monolithic and tightly integrated where each piece of software is responsible for storage layout on disk, query planning, scheduling, and execution. Since these components can be all tuned and optimized for specific needs then it can achieve very good performances of queries they are designed for. Moreover, the SQL query allows expressive queries and allows them to query through data without the need to write code. This makes it accessible to graphical tools used by business analysts such as Tableau.

However, not all kinds of processing are meant to be an SQL statement. For example, building recommendation engines need a more general-purpose model of data processing. These types of processing are specific to a particular application such as the risk estimation function of fraud analytics and feature engineering for recommendation engines.

Also, Hadoop and MapReduce can support an SQL execution query engine on top of them such as Hive. However, people found that MapReduce was too limiting for their specific types of processing so processing models were built on top of Hadoop. Due to the open nature of Hadoop, it is able to implement a whole range of approaches not available within a monolithic MPP database.

The Hadoop ecosystems include both random-access OLTP systems such as HBase and MPP-style analytic databases such as Impala. Neither Hbase nor Impala uses MapReduce but both use HDFS for storage.

Designing for frequent faults

If a node crashes while a query is executing then an MPP database aborts the entire query and possibly automatically retry it. MPP databases also avoiding writing to disk as it tries to keep as much data in memory using techniques such as hash joins.

However, MapReduce can run retry at specific stages instead of running the job again. This is helpful since a whole job doesn’t need to re-run if a single task fails. MapReduce also eagerly writes to disk partly for fault tolerance and also for the assumption that the data is too big to fit in memory.

In environments where tasks are not often terminated then MapReduce makes less sense. However, in an ecosystem such as Google with thousands of nodes running each with their own priority then MapReduce makes sense. For example, Google will terminate tasks to allow processing for a higher priority task. This leads to constant tasks failing, thus restarting the entire job would prove wasteful.

Thank you for making it this far. In our next blog post, we will cover moving beyond MapReduces.

--

--