作业执行
MapReduce 作业执行
MapReduce 是一个编程框架,你可以使用它编写代码来处理 HDFS 等分布式文件系统中的大型数据集。MapReduce 中的数据处理模式与 Unix 中的文件分析非常相似:
- 读取一组输入文件,并将其分解成记录(records)。在 Web 服务器日志示例中,每条记录都是日志中的一行(即
\n
是记录分隔符)。 - 调用 Mapper 函数,从每条输入记录中提取一对键值。在前面的例子中,Mapper 函数是
awk '{print $7}'
:它提取 URL($7
)作为关键字,并将值留空。 - 按键排序所有的键值对。在日志的例子中,这由第一个
sort
命令完成。 - 调用 Reducer 函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中,Reducer 是由
uniq -c
命令实现的,该命令使用相同的键来统计相邻记录的数量。
这四个步骤可以作为一个 MapReduce 作业执行。步骤 2(Map)和 4(Reduce)是你编写自定义数据处理代码的地方。步骤 1(将文件分解成记录)由输入格式解析器处理。步骤 3 中的排序步骤隐含在 MapReduce 中,你不必编写它,因为 Mapper 的输出始终在送往 Reducer 之前进行排序。
Mapper 与 Reducer
要创建 MapReduce 作业,你需要实现两个回调函数,Mapper 和 Reducer,其行为如下
-
Mapper:Mapper 会在每条输入记录上调用一次,其工作是从输入记录中提取键值。对于每个输入,它可以生成任意数量的键值对(包括 None)。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。
-
Reducer:MapReduce 框架拉取由 Mapper 生成的键值对,收集属于同一个键的所有值,并使用在这组值列表上迭代调用 Reducer。Reducer 可以产生输出记录(例如相同 URL 的出现次数)。
在 Web 服务器日志的例子中,我们在第 5 步中有第二个 sort 命令,它按请求数对 URL 进行排序。在 MapReduce 中,如果你需要第二个排序阶段,则可以通过编写第二个 MapReduce 作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来,Mapper 的作用是将数据放入一个适合排序的表单中,并且 Reducer 的作用是处理已排序的数据。
分布式执行 MapReduce
MapReduce 与 Unix 命令管道的主要区别在于,MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显式处理并行问题。Mapper 和 Reducer 一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性。在分布式计算中可以使用标准的 Unix 工具作为 Mapper 和 Reducer,但更常见的是,它们被实现为传统编程语言的函数。在 Hadoop MapReduce 中,Mapper 和 Reducer 都是实现特定接口的 Java 类。在 MongoDB 和 CouchDB 中,Mapper 和 Reducer 都是 JavaScript 函数。
下图显示了 Hadoop MapReduce 作业中的数据流。其并行化基于分区:作业的输入通常是 HDFS 中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理 map 任务。每个输入文件的大小通常是数百兆字节。MapReduce 调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个 Mapper,只要该机器有足够的备用 RAM 和 CPU 资源来运行 Mapper 任务。这个原则被称为将计算放在数据附近:它节省了通过网络复制输入文件的开销,减少网络负载并增加局部性。
在大多数情况下,应该在 Mapper 任务中运行的应用代码在将要运行它的机器上还不存在,所以 MapReduce 框架首先将代码(例如 Java 程序中的 JAR 文件)复制到适当的机器。然后启动 Map 任务并开始读取输入文件,一次将一条记录传入 Mapper 回调函数。Mapper 的输出由键值对组成。计算的 Reduce 端也被分区。虽然 Map 任务的数量由输入文件块的数量决定,但 Reducer 的任务的数量是由作业作者配置的(它可以不同于 Map 任务的数量)。为了确保具有相同键的所有键值对最终落在相同的 Reducer 处,框架使用键的哈希值来确定哪个 Reduce 任务应该接收到特定的键值对。
键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。相反,分类是分阶段进行的。首先每个 Map 任务都按照 Reducer 对输出进行分区。每个分区都被写入 Mapper 程序的本地磁盘。只要当 Mapper 读取完输入文件,并写完排序后的输出文件,MapReduce 调度器就会通知 Reducer 可以从该 Mapper 开始获取输出文件。Reducer 连接到每个 Mapper,并下载自己相应分区的有序键值对文件。按 Reducer 分区,排序,从 Mapper 向 Reducer 复制分区数据,这一整个过程被称为混洗(shuffle)(一个容易混淆的术语:不像洗牌,在 MapReduce 中的混洗没有随机性)。
Reduce 任务从 Mapper 获取文件,并将它们合并在一起,并保留有序特性。因此,如果不同的 Mapper 生成了键相同的记录,则在 Reducer 的输入中,这些记录将会相邻。Reducer 调用时会收到一个键,和一个迭代器作为参数,迭代器会顺序地扫过所有具有该键的记录(因为在某些情况可能无法完全放入内存中)。Reducer 可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录会写入分布式文件系统上的文件中(通常是在跑 Reducer 的机器本地磁盘上留一份,并在其他机器上留几份副本)。
MapReduce 工作流
单个 MapReduce 作业可以解决的问题范围很有限。以日志分析为例,单个 MapReduce 作业可以确定每个 URL 的页面浏览次数,但无法确定最常见的 URL,因为这需要第二轮排序。因此将 MapReduce 作业链接成为工作流(workflow)中是极为常见的,例如,一个作业的输出成为下一个作业的输入。Hadoop Map-Reduce 框架对工作流没有特殊支持,所以这个链是通过目录名隐式实现的:第一个作业必须将其输出配置为 HDFS 中的指定目录,第二个作业必须将其输入配置为从同一个目录。从 MapReduce 框架的角度来看,这是是两个独立的作业。
因此,被链接的 MapReduce 作业并没有那么像 Unix 命令管道(它直接将一个进程的输出作为另一个进程的输入,仅用一个很小的内存缓冲区)。它更像是一系列命令,其中每个命令的输出写入临时文件,下一个命令从临时文件中读取。这种设计有利也有弊。
只有当作业成功完成后,批处理作业的输出才会被视为有效的(MapReduce 会丢弃失败作业的部分输出)。因此,工作流中的一项作业只有在先前的作业:即生产其输入的作业:成功完成后才能开始。为了处理这些作业之间的依赖,有很多针对 Hadoop 的工作流调度器被开发出来,包括 Oozie,Azkaban,Luigi,Airflow 和 Pinball 。
这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统时,由 50 到 100 个 MapReduce 作业组成的工作流是常见的。而在大型组织中,许多不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流而言非常重要。Hadoop 的各种高级工具(如 Pig,Hive,Cascading,Crunch 和 FlumeJava)也能自动布线组装多个 MapReduce 阶段,生成合适的工作流。