22 CHAPTER 2.MAPREDUCE BASICS and Chapter 2.6).The situation is similar for the reduce phase:a reducer object is initialized for each reduce task,and the REDUCE method is called once per intermediate key.In contrast with number of map tasks,the programmer can precisely specify the number of reduce tasks.We will return to discuss the details of Hadoop job execution in Chapter 2.6,which is dependent on an understanding of the distributed file system (covered in Chapter 2.5).To reiterate:although the presentation of algorithms in this book closely mirrors the way they would be implemented in Hadoop,our focus is on algorithm design and conceptual understanding-not actual Hadoop programming.For that,we would recommend Tom White's book [107]. What are the restrictions on mappers and reducers?Mappers and reducers can express arbitrary computations over their inputs.However,one must generally be careful about use of external resources since multiple mappers or reducers would be contending for those resources.For example,it may be unwise for a mapper to query an external SQL database,since that would introduce a scalability bottleneck on the number of map tasks that could be run in parallel(since they might all be simultaneously querying the database).In general,mappers can emit an arbitrary number of intermediate key-value pairs,and they need not be of the same type as the input key-value pairs.Similarly, reducers can emit an arbitrary number of final key-value pairs,and they can differ in type from the intermediate key-value pairs.Although not permitted in functional programming,mappers and reducers can have side effects.This is a powerful and useful feature:for example,preserving state across multiple inputs is central to the design of many MapReduce algorithms (see Chapter 3).Such algorithms can be understood as having side effects that only change state that is internal to the mapper or reducer. While the correctness of such algorithms may be more difficult to guarantee(since the function's behavior depends not only on the current input but on previous inputs),most potential synchronization problems are avoided since internal state is private only to individual mappers and reducers.In other cases (see Chapter 4.3 and Chapter 6.5),it may be useful for mappers or reducers to have external side effects,such as writing files to the distributed file system.Since many mappers and reducers are run in parallel,and the distributed file system is a shared global resource,special care must be taken to ensure that such operations avoid synchronization conflicts.One strategy is to write a temporary file that is renamed upon successful completion of the mapper or reducer [31]. In addition to the "canonical"MapReduce processing flow,other variations are also possible.MapReduce programs can contain no reducers,in which case mapper output is directly written to disk (one file per mapper).For embarrassingly parallel problems,e.g.,parse a large text collection or independently analyze a large number of images,this would be a common pattern.The converse-a MapReduce program with no mappers-is not possible,although in some cases it is useful for the mapper to imple- ment the identity function and simply pass input key-value pairs to the reducers.This
22 CHAPTER 2. MAPREDUCE BASICS and Chapter 2.6). The situation is similar for the reduce phase: a reducer object is initialized for each reduce task, and the Reduce method is called once per intermediate key. In contrast with number of map tasks, the programmer can precisely specify the number of reduce tasks. We will return to discuss the details of Hadoop job execution in Chapter 2.6, which is dependent on an understanding of the distributed file system (covered in Chapter 2.5). To reiterate: although the presentation of algorithms in this book closely mirrors the way they would be implemented in Hadoop, our focus is on algorithm design and conceptual understanding—not actual Hadoop programming. For that, we would recommend Tom White’s book [107]. What are the restrictions on mappers and reducers? Mappers and reducers can express arbitrary computations over their inputs. However, one must generally be careful about use of external resources since multiple mappers or reducers would be contending for those resources. For example, it may be unwise for a mapper to query an external SQL database, since that would introduce a scalability bottleneck on the number of map tasks that could be run in parallel (since they might all be simultaneously querying the database). In general, mappers can emit an arbitrary number of intermediate key-value pairs, and they need not be of the same type as the input key-value pairs. Similarly, reducers can emit an arbitrary number of final key-value pairs, and they can differ in type from the intermediate key-value pairs. Although not permitted in functional programming, mappers and reducers can have side effects. This is a powerful and useful feature: for example, preserving state across multiple inputs is central to the design of many MapReduce algorithms (see Chapter 3). Such algorithms can be understood as having side effects that only change state that is internal to the mapper or reducer. While the correctness of such algorithms may be more difficult to guarantee (since the function’s behavior depends not only on the current input but on previous inputs), most potential synchronization problems are avoided since internal state is private only to individual mappers and reducers. In other cases (see Chapter 4.3 and Chapter 6.5), it may be useful for mappers or reducers to have external side effects, such as writing files to the distributed file system. Since many mappers and reducers are run in parallel, and the distributed file system is a shared global resource, special care must be taken to ensure that such operations avoid synchronization conflicts. One strategy is to write a temporary file that is renamed upon successful completion of the mapper or reducer [31]. In addition to the “canonical” MapReduce processing flow, other variations are also possible. MapReduce programs can contain no reducers, in which case mapper output is directly written to disk (one file per mapper). For embarrassingly parallel problems, e.g., parse a large text collection or independently analyze a large number of images, this would be a common pattern. The converse—a MapReduce program with no mappers—is not possible, although in some cases it is useful for the mapper to implement the identity function and simply pass input key-value pairs to the reducers. This
2.3.THE EXECUTION FRAMEWORK 23 has the effect of sorting and regrouping the input for reduce-side processing.Similarly, in some cases it is useful for the reducer to implement the identity function,in which case the program simply sorts and groups mapper output.Finally,running identity mappers and reducers has the effect of regrouping and resorting the input data(which is sometimes useful). 2.3 THE EXECUTION FRAMEWORK To be precise,MapReduce can refer to three distinct but related concepts.First,Map- Reduce is a programming model,which is the sense discussed above.Second,Map- Reduce can refer to the execution framework (i.e.,the "runtime")that coordinates the execution of programs written in this particular style.Finally,MapReduce can refer to the software implementation of the programming model and the execution frame- work:for example,Google's proprietary implementation vs.the open-source Hadoop implementation.6 For the most part,usage should be clear from context,but for addi- tional clarity we refer specifically to Hadoop when discussing features of the open-source implementation that may or may not be shared by other implementations. So what exactly does the MapReduce execution framework do?In short,it takes over where the programmer leaves off.A MapReduce program consists of code packaged with configuration parameters (such as input and output paths), and is referred to as a job.The programmer submits the MapReduce job to the submission node of a cluster (in Hadoop,this is called the jobtracker)and waits for the job to run.The runtime transparently handles all other aspects of execution, on clusters ranging from a few to a few thousand nodes.Specific responsibilities include: Scheduling.Each MapReduce job is divided into smaller units called tasks (see Chapter 2.6 for more details).For example,a map task may be responsible for processing a certain block of input key-value pairs (called an input split in Hadoop); similarly,a reduce task may handle a portion of the intermediate key space.It is not uncommon for MapReduce jobs to have thousands of individual tasks that need to be assigned to nodes in the cluster.In many cases,the number of tasks exceeds the capacity of the cluster,making some amount of sequential processing inevitable. Another aspect of scheduling involves coordination among tasks belonging to different jobs(e.g.,from different users):how can a large,shared resource support several users simultaneously in a predictable,transparent,policy-driven fashion? Data/code co-location.The phrase data distribution is misleading,since one of the key ideas behind MapReduce is to move the code,not the data.However,the more general point remains-in order for computation to occur,we need to somehow feed 6And in fact,there are many implementations of MapReduce,e.g.,targeted specifically for multi-core proces- sors [90,for GPGPUs [49],for the CELL architecture [89],etc
2.3. THE EXECUTION FRAMEWORK 23 has the effect of sorting and regrouping the input for reduce-side processing. Similarly, in some cases it is useful for the reducer to implement the identity function, in which case the program simply sorts and groups mapper output. Finally, running identity mappers and reducers has the effect of regrouping and resorting the input data (which is sometimes useful). 2.3 THE EXECUTION FRAMEWORK To be precise, MapReduce can refer to three distinct but related concepts. First, MapReduce is a programming model, which is the sense discussed above. Second, MapReduce can refer to the execution framework (i.e., the “runtime”) that coordinates the execution of programs written in this particular style. Finally, MapReduce can refer to the software implementation of the programming model and the execution framework: for example, Google’s proprietary implementation vs. the open-source Hadoop implementation.6 For the most part, usage should be clear from context, but for additional clarity we refer specifically to Hadoop when discussing features of the open-source implementation that may or may not be shared by other implementations. So what exactly does the MapReduce execution framework do? In short, it takes over where the programmer leaves off. A MapReduce program consists of code packaged with configuration parameters (such as input and output paths), and is referred to as a job. The programmer submits the MapReduce job to the submission node of a cluster (in Hadoop, this is called the jobtracker) and waits for the job to run. The runtime transparently handles all other aspects of execution, on clusters ranging from a few to a few thousand nodes. Specific responsibilities include: Scheduling. Each MapReduce job is divided into smaller units called tasks (see Chapter 2.6 for more details). For example, a map task may be responsible for processing a certain block of input key-value pairs (called an input split in Hadoop); similarly, a reduce task may handle a portion of the intermediate key space. It is not uncommon for MapReduce jobs to have thousands of individual tasks that need to be assigned to nodes in the cluster. In many cases, the number of tasks exceeds the capacity of the cluster, making some amount of sequential processing inevitable. Another aspect of scheduling involves coordination among tasks belonging to different jobs (e.g., from different users): how can a large, shared resource support several users simultaneously in a predictable, transparent, policy-driven fashion? Data/code co-location. The phrase data distribution is misleading, since one of the key ideas behind MapReduce is to move the code, not the data. However, the more general point remains—in order for computation to occur, we need to somehow feed 6And in fact, there are many implementations of MapReduce, e.g., targeted specifically for multi-core processors [90], for GPGPUs [49], for the CELL architecture [89], etc
24 CHAPTER 2.MAPREDUCE BASICS data to the code.In MapReduce,this issue is inexplicably intertwined with scheduling and relies heavily on the design of the underlying distributed file system.To achieve data locality,the scheduler starts tasks on the machine that holds a particular block of data (i.e.,on its local drive)needed by the task.This has the effect of moving code to the data.If this is not possible (e.g.,a machine is already running too many tasks), new tasks will be started elsewhere,and the necessary data will be streamed over the network.An important optimization here is to prefer machines that are on the same rack (in the data center)as the machine with the relevant data block,since inter-rack bandwidth is significantly less than intra-rack bandwidth. Synchronization.In general,synchronization refers to the mechanisms by which mul- tiple concurrently running processes "join up",for example,to share intermediate results or otherwise exchange state information.In MapReduce,synchronization is accom- plished by a barrier between the map and reduce phases of processing.Intermediate key-value pairs must be grouped by key,which is accomplished by a large distributed sort involving all the nodes that executed map tasks and all the nodes that will execute reduce tasks.This necessarily involves copying intermediate data over the network,and therefore the process is commonly known as "shuffle and sort".A MapReduce job with m mappers and r reducers involves m x r distinct copy operations,since each mapper may have intermediate output going to every reducer. Note that reducers cannot start until all the mappers have finished,since the ex- ecution framework cannot otherwise guarantee that all values associated with the same key have been gathered.This is an important departure from functional programming: in a fold operation,the aggregation function g is a function of the intermediate value and the next item in the list-which means that values can be lazily generated and aggregation can begin as soon as values are available.In contrast,the reducer in Map- Reduce receives all values associated with the same key at once.Although reducers must wait for all mappers to finish before they start processing,it is possible to start copying intermediate key-value pairs over the network to the nodes running the reducers before the mappers have finished-this is a common optimization and implemented in Hadoop. Error and fault handling.The MapReduce execution framework must accomplish all the tasks above in an environment where errors and faults are the norm,not the exception.Since MapReduce was explicitly designed around low-end commodity servers, the runtime must be especially resilient.In large clusters,disk failures are common [87] and RAM experiences more errors than one might expect [93].Large datacenters suffer from both planned outages (e.g.,system maintenance and hardware upgrades)and unexpected outages (e.g.,power failure,connectivity loss,etc.). And that's just hardware.No software is bug free-exceptions must be appropri- ately trapped,logged,and recovered from.Large-data problems have a penchant for
24 CHAPTER 2. MAPREDUCE BASICS data to the code. In MapReduce, this issue is inexplicably intertwined with scheduling and relies heavily on the design of the underlying distributed file system. To achieve data locality, the scheduler starts tasks on the machine that holds a particular block of data (i.e., on its local drive) needed by the task. This has the effect of moving code to the data. If this is not possible (e.g., a machine is already running too many tasks), new tasks will be started elsewhere, and the necessary data will be streamed over the network. An important optimization here is to prefer machines that are on the same rack (in the data center) as the machine with the relevant data block, since inter-rack bandwidth is significantly less than intra-rack bandwidth. Synchronization. In general, synchronization refers to the mechanisms by which multiple concurrently running processes “join up”, for example, to share intermediate results or otherwise exchange state information. In MapReduce, synchronization is accomplished by a barrier between the map and reduce phases of processing. Intermediate key-value pairs must be grouped by key, which is accomplished by a large distributed sort involving all the nodes that executed map tasks and all the nodes that will execute reduce tasks. This necessarily involves copying intermediate data over the network, and therefore the process is commonly known as “shuffle and sort”. A MapReduce job with m mappers and r reducers involves m × r distinct copy operations, since each mapper may have intermediate output going to every reducer. Note that reducers cannot start until all the mappers have finished, since the execution framework cannot otherwise guarantee that all values associated with the same key have been gathered. This is an important departure from functional programming: in a fold operation, the aggregation function g is a function of the intermediate value and the next item in the list—which means that values can be lazily generated and aggregation can begin as soon as values are available. In contrast, the reducer in MapReduce receives all values associated with the same key at once. Although reducers must wait for all mappers to finish before they start processing, it is possible to start copying intermediate key-value pairs over the network to the nodes running the reducers before the mappers have finished—this is a common optimization and implemented in Hadoop. Error and fault handling. The MapReduce execution framework must accomplish all the tasks above in an environment where errors and faults are the norm, not the exception. Since MapReduce was explicitly designed around low-end commodity servers, the runtime must be especially resilient. In large clusters, disk failures are common [87] and RAM experiences more errors than one might expect [93]. Large datacenters suffer from both planned outages (e.g., system maintenance and hardware upgrades) and unexpected outages (e.g., power failure, connectivity loss, etc.). And that’s just hardware. No software is bug free—exceptions must be appropriately trapped, logged, and recovered from. Large-data problems have a penchant for
2.4.PARTITIONERS AND COMBINERS 25 uncovering obscure corner cases in code that is otherwise thought to be "bulletproof".7 Furthermore,any sufficiently large dataset will contain corrupted data or records that are mangled beyond a programmer's imagination-resulting in errors that one would never think to check for or trap.The MapReduce execution framework must thrive in this hostile environment. 2.4 PARTITIONERS AND COMBINERS We have thus far presented a simplified view of MapReduce.There are two additional elements that complete the programming model:partitioners and combiners. Partitioners are responsible for dividing up the intermediate key space and assign- ing intermediate key-value pairs to reducers.In other words,the partitioner specifies the node to which an intermediate key-value pair must be copied.Within each reducer, keys are processed in sorted order (which is how the "group by"is implemented).The simplest partitioner involves computing the hash value of the key and then taking the mod of that value with the number of reducers.This assigns approximately the same number of keys to each reducer (dependent on the quality of the hash function).Note, however,that the partitioner only considers the key and ignores the value-therefore, a roughly-even partitioning of the key space may nevertheless yield large differences in the number of key-values pairs sent to each reducer (since different keys may have different numbers of associated values). Combiners are an optimization in MapReduce that allow for local aggregation before the shuffle and sort phase.We can motivate the need for combiners by considering the word count algorithm in Figure 2.3,which emits a key-value pair for each word in the collection.Furthermore,all these key-value pairs need to be copied across the network, and so the amount of intermediate data will be larger than the input collection itself. This is clearly inefficient.One solution is to perform local aggregation on the output of each mapper,i.e.,to compute a local count for a word over all the documents processed by the mapper.With this modification,the number of intermediate key-value pairs will be at most the number of unique words in the collection times the number of mappers (and typically far smaller because each mapper may not encounter every word). The combiner in MapReduce supports such an optimization.One can think of combiners as "mini-reducers"that take place on the output of the mappers,prior to the shuffle and sort phase.Each combiner operates in isolation and therefore does not have access to intermediate output from other mappers.Like the reducer,the combiner is provided keys and all values associated with each key.It can emit any number of key-value pairs,but the keys and values must be of the same type as the reducer (i.e., the combiner and reducer must have the exact same method signature).In cases where an operation is both associative and commutative (e.g.,addition or multiplication), 7For example,Hadoop has unearthed several bugs in Sun's JVM
2.4. PARTITIONERS AND COMBINERS 25 uncovering obscure corner cases in code that is otherwise thought to be “bulletproof”.7 Furthermore, any sufficiently large dataset will contain corrupted data or records that are mangled beyond a programmer’s imagination—resulting in errors that one would never think to check for or trap. The MapReduce execution framework must thrive in this hostile environment. 2.4 PARTITIONERS AND COMBINERS We have thus far presented a simplified view of MapReduce. There are two additional elements that complete the programming model: partitioners and combiners. Partitioners are responsible for dividing up the intermediate key space and assigning intermediate key-value pairs to reducers. In other words, the partitioner specifies the node to which an intermediate key-value pair must be copied. Within each reducer, keys are processed in sorted order (which is how the “group by” is implemented). The simplest partitioner involves computing the hash value of the key and then taking the mod of that value with the number of reducers. This assigns approximately the same number of keys to each reducer (dependent on the quality of the hash function). Note, however, that the partitioner only considers the key and ignores the value—therefore, a roughly-even partitioning of the key space may nevertheless yield large differences in the number of key-values pairs sent to each reducer (since different keys may have different numbers of associated values). Combiners are an optimization in MapReduce that allow for local aggregation before the shuffle and sort phase. We can motivate the need for combiners by considering the word count algorithm in Figure 2.3, which emits a key-value pair for each word in the collection. Furthermore, all these key-value pairs need to be copied across the network, and so the amount of intermediate data will be larger than the input collection itself. This is clearly inefficient. One solution is to perform local aggregation on the output of each mapper, i.e., to compute a local count for a word over all the documents processed by the mapper. With this modification, the number of intermediate key-value pairs will be at most the number of unique words in the collection times the number of mappers (and typically far smaller because each mapper may not encounter every word). The combiner in MapReduce supports such an optimization. One can think of combiners as “mini-reducers” that take place on the output of the mappers, prior to the shuffle and sort phase. Each combiner operates in isolation and therefore does not have access to intermediate output from other mappers. Like the reducer, the combiner is provided keys and all values associated with each key. It can emit any number of key-value pairs, but the keys and values must be of the same type as the reducer (i.e., the combiner and reducer must have the exact same method signature). In cases where an operation is both associative and commutative (e.g., addition or multiplication), 7For example, Hadoop has unearthed several bugs in Sun’s JVM
26 CHAPTER 2.MAPREDUCE BASICS reducers can directly serve as combiners.In general,however,reducers and combiners are not interchangeable. In many cases,proper use of combiners can spell the difference between an imprac- tical algorithm and an efficient algorithm.This topic will be discussed in Chapter 3.1, which focuses various techniques for local aggregation.It suffices to say for now that a combiner can significantly reduce the amount of data that needs to be copied over the network,resulting in much faster algorithms.8 The complete MapReduce model is shown in Figure 2.4.Output of the mappers are processed by the combiners,which perform local aggregation to cut down on the number of intermediate key-value pairs.The partitioner determines which reducer will be responsible for processing a particular key,and the execution framework uses this information to copy the data to the right location during the shuffle and sort phase. Therefore,a complete MapReduce job consists of code for the mapper,reducer,com- biner,and partitioner,along with job configuration parameters.The execution frame- work handles everything else. 2.5 THE DISTRIBUTED FILE SYSTEM So far,we have mostly focused on the processing aspect of data-intensive processing,but it is important to recognize that without data,there is nothing to compute on.In high- performance computing (HPC)and many traditional cluster architectures,storage is viewed as a distinct and separate component from computation.Implementations vary widely,but network-attached storage(NAS)and storage area networks(SAN)are com- mon;supercomputers often have dedicated subsystems for handling storage(separate nodes,and often even separate networks).Regardless of the details,the processing cycle remains the same at a high level:the compute nodes fetch input from storage,load into memory,process the data,and then write back the results (with perhaps intermediate checkpointing for long-running processes). As dataset sizes increase,more compute capacity is required for processing.But as compute capacity grows,the link between the compute nodes and the storage becomes a bottleneck.At that point,one could invest in higher performance and necessarily more expensive networks(10 gigabit Ethernet),or special-purpose interconnects such as InfiniBand(also expensive).In most cases,this is not a cost-effective solution,as the price of networking equipment increases non-linearly with performance (e.g.,a switch with ten times the capacity is more than ten times more expensive).Alternatively,one 8A note on the implementation of combiners in Hadoop:by default,the execution framework reserves the right to use combiners at its discretion.In reality,this means that a combiner may be invoked one,two,or multiple times.In addition,combiners in Hadoop may actually be invoked in the reduce phase,i.e.,after key-value pairs have been copied over the reducer,but before the user reducer code runs.As a result,reducers must be carefully written so that it can be executed in these different environments.Chapter 3 discusses this in more detail
26 CHAPTER 2. MAPREDUCE BASICS reducers can directly serve as combiners. In general, however, reducers and combiners are not interchangeable. In many cases, proper use of combiners can spell the difference between an impractical algorithm and an efficient algorithm. This topic will be discussed in Chapter 3.1, which focuses various techniques for local aggregation. It suffices to say for now that a combiner can significantly reduce the amount of data that needs to be copied over the network, resulting in much faster algorithms.8 The complete MapReduce model is shown in Figure 2.4. Output of the mappers are processed by the combiners, which perform local aggregation to cut down on the number of intermediate key-value pairs. The partitioner determines which reducer will be responsible for processing a particular key, and the execution framework uses this information to copy the data to the right location during the shuffle and sort phase. Therefore, a complete MapReduce job consists of code for the mapper, reducer, combiner, and partitioner, along with job configuration parameters. The execution framework handles everything else. 2.5 THE DISTRIBUTED FILE SYSTEM So far, we have mostly focused on the processing aspect of data-intensive processing, but it is important to recognize that without data, there is nothing to compute on. In highperformance computing (HPC) and many traditional cluster architectures, storage is viewed as a distinct and separate component from computation. Implementations vary widely, but network-attached storage (NAS) and storage area networks (SAN) are common; supercomputers often have dedicated subsystems for handling storage (separate nodes, and often even separate networks). Regardless of the details, the processing cycle remains the same at a high level: the compute nodes fetch input from storage, load into memory, process the data, and then write back the results (with perhaps intermediate checkpointing for long-running processes). As dataset sizes increase, more compute capacity is required for processing. But as compute capacity grows, the link between the compute nodes and the storage becomes a bottleneck. At that point, one could invest in higher performance and necessarily more expensive networks (10 gigabit Ethernet), or special-purpose interconnects such as InfiniBand (also expensive). In most cases, this is not a cost-effective solution, as the price of networking equipment increases non-linearly with performance (e.g., a switch with ten times the capacity is more than ten times more expensive). Alternatively, one 8A note on the implementation of combiners in Hadoop: by default, the execution framework reserves the right to use combiners at its discretion. In reality, this means that a combiner may be invoked one, two, or multiple times. In addition, combiners in Hadoop may actually be invoked in the reduce phase, i.e., after key-value pairs have been copied over the reducer, but before the user reducer code runs. As a result, reducers must be carefully written so that it can be executed in these different environments. Chapter 3 discusses this in more detail