流式连接
流式连接
我们讨论了批处理作业如何通过键来连接数据集,以及这种连接是如何成为数据管道的重要组成部分的。由于流处理将数据管道泛化为对无限数据集进行增量处理,因此对流进行连接的需求也是完全相同的。然而,新事件随时可能出现在一个流中,这使得流连接要比批处理连接更具挑战性。为了更好地理解情况,让我们先来区分三种不同类型的连接:流-流连接,流-表连接,与表-表连接。
-
流流连接:两个输入流都由活动事件组成,而连接算子在某个时间窗口内搜索相关的事件。例如,它可能会将同一个用户 30 分钟内进行的两个活动联系在一起。如果你想要找出一个流内的相关事件,连接的两侧输入可能实际上都是同一个流(自连接(self-join))。
-
流表连接:一个输入流由活动事件组成,另一个输入流是数据库变更日志。变更日志保证了数据库的本地副本是最新的。对于每个活动事件,连接算子将查询数据库,并输出一个扩展的活动事件。
-
表表连接:两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流。
流流连接(窗口连接)
假设你的网站上有搜索功能,而你想要找出搜索 URL 的近期趋势。每当有人键入搜索查询时,都会记录下一个包含查询与其返回结果的事件。每当有人点击其中一个搜索结果时,就会记录另一个记录点击事件。为了计算搜索结果中每个 URL 的点击率,你需要将搜索动作与点击动作的事件连在一起,这些事件通过相同的会话 ID 进行连接。广告系统中需要类似的分析。
如果用户丢弃了搜索结果,点击可能永远不会发生,即使它出现了,搜索与点击之间的时间可能是高度可变的:在很多情况下,它可能是几秒钟,但也可能长达几天或几周(如果用户执行搜索,忘掉了这个浏览器页面,过了一段时间后重新回到这个浏览器页面上,并点击了一个结果)。由于可变的网络延迟,点击事件甚至可能先于搜索事件到达。你可以选择合适的连接窗口,例如,如果点击与搜索之间的时间间隔在一小时内,你可能会选择连接两者。
请注意,在点击事件中嵌入搜索详情与事件连接并不一样:这样做的话,只有当用户点击了一个搜索结果时你才能知道,而那些没有点击的搜索就无能为力了。为了衡量搜索质量,你需要准确的点击率,为此搜索事件和点击事件两者都是必要的。为了实现这种类型的连接,流处理器需要维护状态:例如,按会话 ID 索引最近一小时内发生的所有事件。无论何时发生搜索事件或点击事件,都会被添加到合适的索引中,而流处理器也会检查另一个索引是否有具有相同会话 ID 的事件到达。如果有匹配事件就会发出一个表示搜索结果被点击的事件;如果搜索事件直到过期都没看见有匹配的点击事件,就会发出一个表示搜索结果未被点击的事件。
流表连接(流扩展)
一般的用户活动事件分析中,我们看到了连接两个数据集的批处理作业示例:一组用户活动事件和一个用户档案数据库。将用户活动事件视为流,并在流处理器中连续执行相同的连接是很自然的想法:输入是包含用户 ID 的活动事件流,而输出还是活动事件流,但其中用户 ID 已经被扩展为用户的档案信息。这个过程有时被称为 使用数据库的信息来扩充(enriching)活动事件。要执行此联接,流处理器需要一次处理一个活动事件,在数据库中查找事件的用户 ID,并将档案信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现。不过,此类远程查询可能会很慢,并且有可能导致数据库过载。
另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“Map 端连接”中讨论的哈希连接非常相似:如果数据库的本地副本足够小,则可以是内存中的哈希表,比较大的话也可以是本地磁盘上的索引。与批处理作业的区别在于,批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持更新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户档案数据库的更新日志,如同活跃事件流一样。当增添或修改档案时,流处理器会更新其本地副本。因此,我们有了两个流之间的连接:活动事件和档案更新。
流表连接实际上非常类似于流流连接;最大的区别在于对于表的变更日志流,连接使用了一个可以回溯到“时间起点”的窗口(概念上是无限的窗口),新版本的记录会覆盖更早的版本。对于输入的流,连接可能压根儿就没有维护窗口。
表表连接(维护物化视图)
譬如在推特时间线中,当用户想要查看他们的主页时间线时,迭代用户所关注人群的推文并合并它们是一个开销巨大的操作。相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,因而读取时间线时只需要简单地查询即可。物化与维护这个缓存需要处理以下事件:
- 当用户 u 发送新的推文时,它将被添加到每个关注用户 u 的时间线上。
- 用户删除推文时,推文将从所有用户的时间表中删除。
- 当用户$u_1$开始关注用户$u_2$时,$u_2$最近的推文将被添加到$u_1$的时间线上。
- 当用户$u_1$取消关注用户$u_2$时,$u_2$的推文将从$u_1$的时间线中移除。
要在流处理器中实现这种缓存维护,你需要推文事件流(发送与删除)和关注关系事件流(关注与取消关注)。流处理需要为维护一个数据库,包含每个用户的粉丝集合。以便知道当一条新推文到达时,需要更新哪些时间线。观察这个流处理过程的另一种视角是:它维护了一个连接了两个表(推文与关注)的物化视图,如下所示:
SELECT follows.follower_id AS timeline_id,
array_agg(tweets.* ORDER BY tweets.timestamp DESC)
FROM tweets
JOIN follows ON follows.followee_id = tweets.sender_id
GROUP BY follows.follower_id
流连接直接对应于这个查询中的表连接。时间线实际上是这个查询结果的缓存,每当基础表发生变化时都会更新。
连接的时间依赖性
这里描述的三种连接(流流,流表,表表)有很多共通之处:它们都需要流处理器维护连接一侧的一些状态(搜索与点击事件,用户档案,关注列表),然后当连接另一侧的消息到达时查询该状态。用于维护状态的事件顺序是很重要的(先关注然后取消关注,或者其他类似操作)。在分区日志中,单个分区内的事件顺序是保留下来的。但典型情况下是没有跨流或跨分区的顺序保证的。
这就产生了一个问题:如果不同流中的事件发生在近似的时间范围内,则应该按照什么样的顺序进行处理?在流表连接的例子中,如果用户更新了它们的档案,哪些活动事件与旧档案连接(在档案更新前处理),哪些又与新档案连接(在档案更新之后处理)?换句话说:你需要对一些状态做连接,如果状态会随着时间推移而变化,那应当使用什么时间点来连接呢?
这种时序依赖可能出现在很多地方。例如销售东西需要对发票应用适当的税率,这取决于所处的国家/州,产品类型,销售日期(因为税率会随时变化)。当连接销售额与税率表时,你可能期望的是使用销售时的税率参与连接。如果你正在重新处理历史数据,销售时的税率可能和现在的税率有所不同。如果跨越流的事件顺序是未定的,则连接会变为不确定性的,这意味着你在同样输入上重跑相同的作业未必会得到相同的结果:当你重跑任务时,输入流上的事件可能会以不同的方式交织。
在数据仓库中,这个问题被称为缓慢变化的维度(slowly changing dimension, SCD),通常通过对特定版本的记录使用唯一的标识符来解决:例如,每当税率改变时都会获得一个新的标识符,而发票在销售时会带有税率的标识符。这种变化使连接变为确定性的,但也会导致日志压缩无法进行:表中所有的记录版本都需要保留。