连接与分组
Reduce 端连接与分组
在许多数据集中,一条记录与另一条记录存在关联是很常见的:关系模型中的外键,文档模型中的文档引用或图模型中的边。当你需要同时访问这一关联的两侧(持有引用的记录与被引用的记录)时,连接就是必须的。(包含引用的记录和被引用的记录),连接就是必需的。在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用索引来快速定位感兴趣的记录,如果查询涉及到连接,则可能涉及到查找多个索引。然而 MapReduce 没有索引的概念,至少在通常意义上没有。
当 MapReduce 作业被赋予一组文件作为输入时,它读取所有这些文件的全部内容;数据库会将这种操作称为全表扫描。如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。但是在分析查询中,通常需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。当我们在批处理的语境中讨论连接时,我们指的是在数据集中解析某种关联的全量存在。例如我们假设一个作业是同时处理所有用户的数据,而非仅仅是为某个特定用户查找数据(而这能通过索引更高效地完成)。
案例:分析用户活动事件
下图给出了一个批处理作业中连接的典型例子。左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件(activity events)或点击流数据(clickstream data)),右侧是用户数据库。你可以将此示例看作是星型模式的一部分:事件日志是事实表,用户数据库是其中的一个维度。
分析任务可能需要将用户活动与用户简档相关联:例如,如果档案包含用户的年龄或出生日期,系统就可以确定哪些页面更受哪些年龄段的用户欢迎。然而活动事件仅包含用户 ID,而没有包含完整的用户档案信息。在每个活动事件中嵌入这些档案信息很可能会非常浪费。因此,活动事件需要与用户档案数据库相连接。
实现这一连接的最简单方法是,逐个遍历活动事件,并为每个遇到的用户 ID 查询用户数据库(在远程服务器上)。这是可能的,但是它的性能可能会非常差:处理吞吐量将受限于受数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布,并行运行大量查询可能会轻易压垮数据库。为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)限于单台机器上进行。为待处理的每条记录发起随机访问的网络请求实在是太慢了。而且,查询远程数据库意味着批处理作业变为非确定的(nondeterministic),因为远程数据库中的数据可能会改变。因此,更好的方法是获取用户数据库的副本(例如,使用 ETL 进程从数据库备份中提取数据,参阅“数据仓库”),并将它和用户行为日志放入同一个分布式文件系统中。然后你可以将用户数据库存储在 HDFS 中的一组文件中,而用户活动记录存储在另一组文件中,并能用 MapReduce 将所有相关记录集中到同一个地方进行高效处理。
排序合并连接
Mapper 的目的是从每个输入记录中提取一对键值,这个键就是用户 ID:一组 Mapper 会扫过活动事件(提取用户 ID 作为键,活动事件作为值),而另一组 Mapper 将会扫过用户数据库(提取用户 ID 作为键,用户的出生日期作为值)。这个过程如下图所示。
当 MapReduce 框架通过键对 Mapper 输出进行分区,然后对键值对进行排序时,效果是具有相同 ID 的所有活动事件和用户记录在 Reducer 输入中彼此相邻。Map-Reduce 作业甚至可以也让这些记录排序,使 Reducer 总能先看到来自用户数据库的记录,紧接着是按时间戳顺序排序的活动事件,这种技术被称为二次排序(secondary sort)。然后 Reducer 可以容易地执行实际的连接逻辑:每个用户 ID 都会被调用一次 Reducer 函数,且因为二次排序,第一个值应该是来自用户数据库的出生日期记录。Reducer 将出生日期存储在局部变量中,然后使用相同的用户 ID 遍历活动事件,输出已观看网址和观看者年龄的结果对。随后的 Map-Reduce 作业可以计算每个 URL 的查看者年龄分布,并按年龄段进行聚集。
由于 Reducer 一次处理一个特定用户 ID 的所有记录,因此一次只需要将一条用户记录保存在内存中,而不需要通过网络发出任何请求。这个算法被称为排序合并连接(sort-merge join),因为 Mapper 的输出是按键排序的,然后 Reducer 将来自连接两侧的有序记录列表合并在一起。
把相关数据放在一起
在排序合并连接中,Mapper 和排序过程确保了所有对特定用户 ID 执行连接操作的必须数据都被放在同一个地方:单次调用 Reducer 的地方。预先排好了所有需要的数据,Reducer 可以是相当简单的单线程代码,能够以高吞吐量和与低内存开销扫过这些记录。
这种架构可以看做,Mapper 将“消息”发送给 Reducer。当一个 Mapper 发出一个键值对时,这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串(不是像 IP 地址和端口号那样的实际的网络地址),它表现的就像一个地址:所有具有相同键的键值对将被传递到相同的目标(一次 Reduce 的调用)。
使用 MapReduce 编程模型,能将计算的物理网络通信层面(从正确的机器获取数据)从应用逻辑中剥离出来(获取数据后执行处理)。这种分离与数据库的典型用法形成了鲜明对比,从数据库中获取数据的请求经常出现在应用代码内部。由于 MapReduce 能够处理所有的网络通信,因此它也避免了应用代码去担心部分故障,例如另一个节点的崩溃:MapReduce 在不影响应用逻辑的情况下能透明地重试失败的任务。
GROUP BY
除了连接之外,“把相关数据放在一起”的另一种常见模式是,按某个键对记录分组(如 SQL 中的 GROUP BY 子句)。所有带有相同键的记录构成一个组,而下一步往往是在每个组内进行某种聚合操作,例如:
- 统计每个组中记录的数量(例如在统计 PV 的例子中,在 SQL 中表示为
COUNT(*)
聚合) - 对某个特定字段求和(SQL 中的 SUM(fieldname))
- 按某种分级函数取出排名前 k 条记录。
使用 MapReduce 实现这种分组操作的最简单方法是设置 Mapper,以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有具有相同分区键的记录导向同一个 Reducer。因此在 MapReduce 之上实现分组和连接看上去非常相似。分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为会话化(sessionization))。例如,可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本(A/B 测试)的用户更有购买欲,或者计算某个营销活动是否值得。如果你有多个 Web 服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话 cookie,用户 ID 或类似的标识符作为分组键,以将特定用户的所有活动事件放在一起来实现会话化,与此同时,不同用户的事件仍然散步在不同的分区中。
处理倾斜
如果存在与单个键关联的大量数据,则“将具有相同键的所有记录放到相同的位置”这种模式就被破坏了。例如在社交网络中,大多数用户可能会与几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为关键对象(linchpin object)或热键(hot key)。在单个 Reducer 中收集与某个名流相关的所有活动(例如他们发布内容的回复)可能导致严重的倾斜(也称为热点(hot spot)),也就是说,一个 Reducer 必须比其他 Reducer 处理更多的记录。由于 MapReduce 作业只有在所有 Mapper 和 Reducer 都完成时才完成,所有后续作业必须等待最慢的 Reducer 才能启动。
如果连接的输入存在热点键,可以使用一些算法进行补偿。例如,Pig 中的倾斜连接(skewed join)方法首先运行一个抽样作业来确定哪些键是热键。连接实际执行时,Mapper 会将热键的关联记录随机(相对于传统 MapReduce 基于键哈希的确定性方法)发送到几个 Reducer 之一。对于另外一侧的连接输入,与热键相关的记录需要被复制到所有处理该键的 Reducer 上。这种技术将处理热键的工作分散到多个 Reducer 上,这样可以使其更好地并行化,代价是需要将连接另一侧的输入记录复制到多个 Reducer 上。Crunch 中的分片连接(sharded join)方法与之类似,但需要显式指定热键而不是使用采样作业。
Hive 的偏斜连接优化采取了另一种方法。它需要在表格元数据中显式指定热键,并将与这些键相关的记录单独存放,与其它文件分开。当在该表上执行连接时,对于热键,它会使用 Map 端连接。当按照热键进行分组并聚合时,可以将分组分两个阶段进行。第一个 MapReduce 阶段将记录发送到随机 Reducer,以便每个 Reducer 只对热键的子集执行分组,为每个键输出一个更紧凑的中间聚合结果。然后第二个 MapReduce 作业将所有来自第一阶段 Reducer 的中间聚合结果合并为每个键一个值。
Map 端连接
上一节描述的连接算法在 Reducer 中执行实际的连接逻辑,因此被称为 Reduce 端连接。Mapper 扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给 Reducer 分区,并按键排序。Reduce 端方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper 都可以对其预处理以备连接。然而不利的一面是,排序,复制至 Reducer,以及合并 Reducer 输入,所有这些操作可能开销巨大。当数据通过 MapReduce 阶段时,数据可能需要落盘好几次,取决于可用的内存缓冲区。
另一方面,如果你能对输入数据作出某些假设,则通过使用所谓的 Map 端连接来加快连接速度是可行的。这种方法使用了一个阉掉 Reduce 与排序的 MapReduce 作业,每个 Mapper 只是简单地从分布式文件系统中读取一个输入文件块,然后将输出文件写入文件系统,仅此而已。
广播哈希连接
适用于执行 Map 端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个 Mapper 的内存中。用户数据库小到足以放进内存中。在这种情况下,当 Mapper 启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的哈希中。完成此操作后,Map 程序可以扫描用户活动事件,并简单地在哈希表中查找每个事件的用户 ID。
参与连接的较大输入的每个文件块各有一个 Mapper,每个 Mapper 都会将较小输入整个加载到内存中。这种简单有效的算法被称为广播哈希连接(broadcast hash join):广播一词反映了这样一个事实,每个连接较大输入端分区的 Mapper 都会将较小输入端数据集整个读入内存中(所以较小输入实际上“广播”到较大数据的所有分区上),哈希一词反映了它使用一个哈希表。Pig(名为“复制链接(replicated join)”),Hive(“MapJoin”),Cascading 和 Crunch 支持这种连接。它也被诸如 Impala 的数据仓库查询引擎使用。
除了将连接较小输入加载到内存哈希表中,另一种方法是将较小输入存储在本地磁盘上的只读索引中。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存哈希表几乎一样快的随机查找性能,但实际上并不需要数据集能放入内存中。
分区哈希连接
如果 Map 端连接的输入以相同的方式进行分区,则哈希连接方法可以独立应用于每个分区。譬如你可以根据用户 ID 的最后一位十进制数字来对活动事件和用户数据库进行分区(因此连接两侧各有 10 个分区)。例如,Mapper3 首先将所有具有以 3 结尾的 ID 的用户加载到哈希表中,然后扫描 ID 为 3 的每个用户的所有活动事件。
如果分区正确无误,可以确定的是,所有你可能需要连接的记录都落在同一个编号的分区中。因此每个 Mapper 只需要从输入两端各读取一个分区就足够了。好处是每个 Mapper 都可以在内存哈希表中少放点数据。这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的 MapReduce 作业生成的,那么这可能是一个合理的假设。
分区哈希连接在 Hive 中称为 Map 端桶连接(bucketed map joins)。
Map 端合并连接
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则可适用另一种 Map 端联接的变体。在这种情况下,输入是否小到能放入内存并不重要,因为这时候 Mapper 同样可以执行归并操作(通常由 Reducer 执行)的归并操作:按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。
如果能进行 Map 端合并连接,这通常意味着前一个 MapReduce 作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的 Reduce 阶段进行。但使用独立的仅 Map 作业有时也是合适的,例如,分好区且排好序的中间数据集可能还会用于其他目的。
MapReduce 工作流与 Map 端连接
当下游作业使用 MapReduce 连接的输出时,选择 Map 端连接或 Reduce 端连接会影响输出的结构。Reduce 端连接的输出是按照连接键进行分区和排序的,而 Map 端连接的输出则按照与较大输入相同的方式进行分区和排序(因为无论是使用分区连接还是广播连接,连接较大输入端的每个文件块都会启动一个 Map 任务)。
如前所述,Map 端连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
在 Hadoop 生态系统中,这种关于数据集分区的元数据通常在 HCatalog 和 Hive Metastore 中维护。