作业输出
批处理工作流的输出
我们已经说了很多用于实现 MapReduce 工作流的算法,但却忽略了一个重要的问题:这些处理完成之后的最终结果是什么?我们最开始为什么要跑这些作业?在数据库查询的场景中,我们将事务处理(OLTP)与分析两种目的区分开来,OLTP 查询通常根据键查找少量记录,使用索引,并将其呈现给用户(比如在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组与聚合,输出通常有着报告的形式:显示某个指标随时间变化的图表,或按照某种排位取前 10 项,或一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
批处理放哪里合适?它不属于事务处理,也不是分析。它和分析比较接近,因为批处理通常会扫过输入数据集的绝大部分。然而 MapReduce 作业工作流与用于分析目的的 SQL 查询是不同的。批处理过程的输出通常不是报表,而是一些其他类型的结构。
建立搜索索引
Google 最初使用 MapReduce 是为其搜索引擎建立索引,用了由 5 到 10 个 MapReduce 作业组成的工作流实现。虽然 Google 后来也不仅仅是为这个目的而使用 MapReduce,但如果从构建搜索索引的角度来看,更能帮助理解 MapReduce。(直至今日,Hadoop MapReduce 仍然是为 Lucene/Solr 构建索引的好方法)。Lucene 这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字,并找到包含该关键字的所有文档 ID 列表(文章列表)。这是一种非常简化的看法,实际上,搜索索引需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这个原则是成立的。
如果需要对一组固定文档执行全文搜索,则批处理是一种构建索引的高效方法:Mapper 根据需要对文档集合进行分区,每个 Reducer 构建该分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引并行处理效果拔群。由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。如果索引的文档集合发生更改,一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法的计算成本可能会很高。但它的优点是索引过程很容易理解:文档进,索引出。另一个选择是,可以增量建立索引,如果要在索引中添加,删除或更新文档,Lucene 会写新的段文件,并在后台异步合并压缩段文件。
键值存储作为批处理输出
搜索索引只是批处理工作流可能输出的一个例子。批处理的另一个常见用途是构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关的搜索)。这些批处理作业的输出通常是某种数据库:例如,可以通过给定用户 ID 查询该用户推荐好友的数据库,或者可以通过产品 ID 查询相关产品的数据库。
这些数据库需要被处理用户请求的 Web 应用所查询,而它们通常是独立于 Hadoop 基础设施的。那么批处理过程的输出如何回到 Web 应用可以查询的数据库中呢?最直接的选择可能是,直接在 Mapper 或 Reducer 中使用你最爱数据库的客户端库,并从批处理作业直接写入数据库服务器,一次写入一条记录。它能工作(假设你的防火墙规则允许从你的 Hadoop 环境直接访问你的生产数据库),但这并不是一个好主意,出于以下几个原因:
-
正如前面在连接的上下文中讨论的那样,为每条记录发起一个网络请求,要比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
-
MapReduce 作业经常并行运行许多任务。如果所有 Mapper 或 Reducer 都同时写入相同的输出数据库,并以批处理的预期速率工作,那么该数据库很可能被轻易压垮,其查询性能可能变差。这可能会导致系统其他部分的运行问题。
-
通常情况下,MapReduce 为作业输出提供了一个干净利落的“全有或全无”保证:如果作业成功,则结果就是每个任务恰好执行一次所产生的输出,即使某些任务失败且必须一路重试。如果整个作业失败,则不会生成输出。然而从作业内部写入外部系统,会产生外部可见的副作用,这种副作用是不能以这种方式被隐藏的。因此,你不得不去操心部分完成的作业对其他系统可见的结果,并需要理解 Hadoop 任务尝试与预测执行的复杂性。
更好的解决方案是在批处理作业内创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。不少键值存储都支持在 MapReduce 作业中构建数据库文件,包括 Voldemort,Terrapin,ElephantDB 和 HBase 批量加载。构建这些数据库文件是 MapReduce 的一种很好用法的使用方法:使用 Mapper 提取出键并按该键排序,现在已经是构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要 WAL。
将数据加载到 Voldemort 时,服务器将继续用旧数据文件服务请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动将查询切换到新文件。如果在这个过程中出现任何问题,它可以轻易回滚至旧文件,因为它们仍然存在而且不可变。
批处理输出的哲学
Unix 的设计哲学中鼓励以显式指明数据流的方式进行实验:程序读取输入并写入输出。在这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,略做改动或进行调试,而不会搅乱系统的状态。MapReduce 作业的输出处理遵循同样的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:
- 如果在代码中引入了一个错误,而输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将重新被纠正。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。(能够从错误代码中恢复的概念被称为人类容错(human fault tolerance))
- 由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种**最小化不可逆性(minimizing irreversibility)**的原则有利于敏捷软件开发。
- 如果 Map 或 Reduce 任务失败,MapReduce 框架将自动重新调度,并在同样的输入上再次运行它。如果失败是由代码中的错误造成的,那么它会不断崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的,那么故障就会被容忍。因为输入不可变,这种自动重试是安全的,而失败任务的输出会被 MapReduce 框架丢弃。
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异)。
- 与 Unix 工具类似,MapReduce 作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以实现一个专注做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
在这些领域,在 Unix 上表现良好的设计原则似乎也适用于 Hadoop,但 Unix 和 Hadoop 在某些方面也有所不同。例如,因为大多数 Unix 工具都假设输入输出是无类型文本文件,所以它们必须做大量的输入解析工作(本章开头的日志分析示例使用 {print $7}
来提取 URL)。在 Hadoop 上可以通过使用更结构化的文件格式消除一些低价值的语法转换:比如 Avro 和 Parquet 经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进。