物化中间状态
物化中间状态
如前所述,每个 MapReduce 作业都独立于其他任何作业。作业与世界其他地方的主要连接点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为第一个作业输出目录,且外部工作流调度程序必须在第一个作业完成后再启动第二个。如果第一个作业的输出是要在组织内广泛发布的数据集,则这种配置是合理的。在这种情况下,你需要通过名称引用它,并将其重用为多个不同作业的输入(包括由其他团队开发的作业)。将数据发布到分布式文件系统中众所周知的位置能够带来松耦合,这样作业就不需要知道是谁在提供输入或谁在消费输出。
但在很多情况下,你知道一个作业的输出只能用作另一个作业的输入,这些作业由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的中间状态(intermediate state):一种将数据从一个作业传递到下一个作业的方式。在一个用于构建推荐系统的,由 50 或 100 个 MapReduce 作业组成的复杂工作流中,存在着很多这样的中间状态。
将这个中间状态写入文件的过程称为物化(materialization),它意味着对某个操作的结果立即求值并写出来,而不是在请求时按需计算。与 Unix 管道相比,MapReduce 完全物化中间状态的方法存在不足之处:
-
MapReduce 作业只有在前驱作业(生成其输入)中的所有任务都完成时才能启动,而由 Unix 管道连接的进程会同时启动,输出一旦生成就会被消费。不同机器上的数据倾斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
-
Mapper 通常是多余的:它们仅仅是读取刚刚由 Reducer 写入的同样文件,为下一个阶段的分区和排序做准备。在许多情况下,Mapper 代码可能是前驱 Reducer 的一部分:如果 Reducer 和 Mapper 的输出有着相同的分区与排序方式,那么 Reducer 就可以直接串在一起,而不用与 Mapper 相互交织。
-
将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这些临时数据这么搞就比较过分了。
数据流引擎
了解决 MapReduce 的这些问题,几种用于分布式批处理的新执行引擎被开发出来,其中最著名的是 Spark,Tez 和 Flink 。它们的设计方式有很多区别,但有一个共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。由于它们将工作流显式建模为 数据从几个处理阶段穿过,所以这些系统被称为数据流引擎(dataflow engines)。像 MapReduce 一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。
与 MapReduce 不同,这些功能不需要严格扮演交织的 Map 与 Reduce 的角色,而是可以以更灵活的方式进行组合。我们称这些函数为算子(operators),数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:
-
一种选项是对记录按键重新分区并排序,就像在 MapReduce 的混洗阶段一样。这种功能可以用于实现排序合并连接和分组,就像在 MapReduce 中一样。
-
另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区哈希连接的工作,因为构建哈希表还是会把顺序随机打乱。
-
对于广播哈希连接,可以将一个算子的输出,发送到连接算子的所有分区。
这种类型的处理引擎是基于像 Dryad 和 Nephele 这样的研究系统,与 MapReduce 模型相比,它有几个优点:
-
排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个 Map 和 Reduce 阶段之间出现。
-
没有不必要的 Map 任务,因为 Mapper 所做的工作通常可以合并到前面的 Reduce 算子中(因为 Mapper 不会更改数据集的分区)。
-
由于工作流中的所有连接和数据依赖都是显式声明的,因此调度程序能够总览全局,知道哪里需要哪些数据,因而能够利用局部性进行优化。例如,它可以尝试将消费某些数据的任务放在与生成这些数据的任务相同的机器上,从而数据可以通过共享内存缓冲区传输,而不必通过网络复制。
通常,算子间的中间状态足以保存在内存中或写入本地磁盘,这比写入 HDFS 需要更少的 I/O(必须将其复制到多台机器,并将每个副本写入磁盘)。MapReduce 已经对 Mapper 的输出做了这种优化,但数据流引擎将这种思想推广至所有的中间状态。算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。与 MapReduce(为每个任务启动一个新的 JVM)相比,现有 Java 虚拟机(JVM)进程可以重用来运行新算子,从而减少启动开销。
你可以使用数据流引擎执行与 MapReduce 工作流同样的计算,而且由于此处所述的优化,通常执行速度要明显快得多。既然算子是 Map 和 Reduce 的泛化,那么相同的处理代码就可以在任一执行引擎上运行:Pig,Hive 或 Cascading 中实现的工作流可以无需修改代码,可以通过修改配置,简单地从 MapReduce 切换到 Tez 或 Spark。Tez 是一个相当薄的库,它依赖于 YARN shuffle 服务来实现节点间数据的实际复制,而 Spark 和 Flink 则是包含了独立网络通信层,调度器,及用户向 API 的大型框架。我们将简要讨论这些高级 API。
容错
完全物化中间状态至分布式文件系统的一个优点是,它具有持久性,这使得 MapReduce 中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。Spark,Flink 和 Tez 避免将中间状态写入 HDFS,因此它们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则它会从其他仍然可用的数据重新计算(在可行的情况下是先前的中间状态,要么就只能是原始输入数据,通常在 HDFS 上)。
为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的,使用了哪些输入分区?应用了哪些算子?Spark 使用**弹性分布式数据集(RDD)**的抽象来跟踪数据的谱系,而 Flink 对算子状态存档,允许恢复运行在执行过程中遇到错误的算子。在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,算子是否始终产生相同的输出?如果一些丢失的数据已经发送给下游算子,这个问题就很重要。如果算子重新启动,重新计算的数据与原有的丢失数据不一致,下游算子很难解决新旧数据之间的矛盾。对于不确定性算子来说,解决方案通常是杀死下游算子,然后再重跑新数据。
为了避免这种级联故障,最好让算子具有确定性。但需要注意的是,非确定性行为很容易悄悄溜进来:例如,许多编程语言在迭代哈希表的元素时不能对顺序作出保证,许多概率和统计算法显式依赖于使用随机数,以及用到系统时钟或外部数据源,这些都是都不确定性的行为。为了能可靠地从故障中恢复,需要消除这种不确定性因素,例如使用固定的种子生成伪随机数。通过重算数据来从故障中恢复并不总是正确的答案:如果中间状态数据要比源数据小得多,或者如果计算量非常大,那么将中间数据物化为文件可能要比重新计算廉价的多。
关于物化的讨论
回到 Unix 的类比,我们看到,MapReduce 就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是 Unix 管道。尤其是 Flink 是基于管道执行的思想而建立的:也就是说,将算子的输出增量地传递给其他算子,不待输入完成便开始处理。排序算子不可避免地需要消费全部的输入后才能生成任何输出,因为输入中最后一条输入记录可能具有最小的键,因此需要作为第一条记录输出。因此,任何需要排序的算子都需要至少暂时地累积状态。但是工作流的许多其他部分可以以流水线方式执行。
当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它—— 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS 上的物化数据集通常仍是作业的输入和最终输出。和 MapReduce 一样,输入是不可变的,输出被完全替换。比起 MapReduce 的改进是,你不用再自己去将中间状态写入文件系统了。