图与迭代处理
图与迭代处理
在《图数据模型》中,我们讨论了使用图来建模数据,并使用图查询语言来遍历图中的边与点。批处理上下文中的图也很有趣,其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是 PageRank,它试图根据链接到某个网页的其他网页来估计该网页的流行度。它作为配方的一部分,用于确定网络搜索引擎呈现结果的顺序。
注意,像 Spark,Flink 和 Tez 这样的数据流引擎通常将算子作为**有向无环图(DAG)**的一部分安排在作业中。但是这与图处理不一样:在数据流引擎中,从一个算子到另一个算子的数据流被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。又一个不幸的命名混乱。
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止。例如,直到没有更多的边要跟进,或直到一些指标收敛;通过重复跟进标明地点归属关系的边,生成了数据库中北美包含的所有地点列表(这种算法被称为闭包传递(transitive closure))。
可以在分布式文件系统中存储图(包含顶点和边的列表的文件),但是这种“重复至完成”的想法不能用普通的 MapReduce 来表示,因为它只扫过一趟数据。这种算法因此经常以迭代的风格实现:
- 外部调度程序运行批处理来计算算法的一个步骤。
- 当批处理过程完成时,调度器检查它是否完成(基于完成条件:例如,没有更多的边要跟进,或者与上次迭代相比的变化低于某个阈值)。
- 如果尚未完成,则调度程序返回到步骤 1 并运行另一轮批处理。
这种方法是有效的,但是用 MapReduce 实现它往往非常低效,因为 MapReduce 没有考虑算法的迭代性质:它总是读取整个输入数据集并产生一个全新的输出数据集,即使与上次迭代相比,改变的仅仅是图中的一小部分。
Pregel 处理模型
针对图批处理的优化,批量同步并行(BSP)计算模型已经开始流行起来。其中,Apache Giraph,Spark 的 GraphX API 和 Flink 的 Gelly API 实现了它。它也被称为 Pregel 模型,因为 Google 的 Pregel 论文推广了这种处理图的方法。回想一下在 MapReduce 中,Mapper 在概念上向 Reducer 的特定调用“发送消息”,因为框架将所有具有相同键的 Mapper 输出集中在一起。Pregel 背后有一个类似的想法:一个顶点可以向另一个顶点“发送消息”,通常这些消息是沿着图的边发送的。
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它,就像调用 Reducer 一样。与 MapReduce 的不同之处在于,在 Pregel 模型中,顶点在一次迭代到下一次迭代的过程中会记住它的状态,所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息,那里就不需要做任何工作。
这与 Actor 模型有些相似,除了顶点状态和顶点之间的消息具有容错性和耐久性,且通信以固定的方式进行:在每次迭代中,框架递送上次迭代中发送的所有消息。Actor 通常没有这样的时间保证。
容错
顶点只能通过消息传递进行通信(而不是直接相互查询)的事实有助于提高 Pregel 作业的性能,因为消息可以成批处理,且等待通信的次数也减少了。唯一的等待是在迭代之间:由于 Pregel 模型保证所有在一轮迭代中发送的消息都在下轮迭代中送达,所以在下一轮迭代开始前,先前的迭代必须完全完成,而所有的消息必须在网络上完成复制。
即使底层网络可能丢失,重复或任意延迟消息,Pregel 的实现能保证在后续迭代中消息在其目标顶点恰好处理一次。像 MapReduce 一样,框架能从故障中透明地恢复,以简化在 Pregel 上实现算法的编程模型。这种容错是通过在迭代结束时,定期存档所有顶点的状态来实现的,即将其全部状态写入持久化存储。如果某个节点发生故障并且其内存中的状态丢失,则最简单的解决方法是将整个图计算回滚到上一个存档点,然后重启计算。如果算法是确定性的,且消息记录在日志中,那么也可以选择性地只恢复丢失的分区(就像之前讨论过的数据流引擎)。
并行执行
顶点不需要知道它在哪台物理机器上执行;当它向其他顶点发送消息时,它只是简单地将消息发往某个顶点 ID。图的分区取决于框架。即,确定哪个顶点运行在哪台机器上,以及如何通过网络路由消息,以便它们到达正确的地方。由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式对图分区。理想情况下如果顶点需要进行大量的通信,那么它们最好能被分区到同一台机器上。然而找到这样一种优化的分区方法是很困难的,在实践中,图经常按照任意分配的顶点 ID 分区,而不会尝试将相关的顶点分组在一起。
因此,图算法通常会有很多跨机器通信的额外开销,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销会显著拖慢分布式图算法的速度。出于这个原因,如果你的图可以放入一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理。图比内存大也没关系,只要能放入单台计算机的磁盘,使用 GraphChi 等框架进行单机处理是就一个可行的选择。如果图太大,不适合单机处理,那么像 Pregel 这样的分布式方法是不可避免的。高效的并行图算法是一个进行中的研究领域。