本文探讨如何为有状态流式处理选择适当的输出模式。 只有包含聚合的有状态流才需要输出模式配置。
联接仅支持追加输出模式,输出模式不会影响重复数据删除。 任意有状态运算符 mapGroupsWithState 和 flatMapGroupsWithState 使用自己的自定义逻辑发出记录,因此流的输出模式不会影响其行为。
对于无状态流式处理,所有输出模式的行为都相同。
若要正确配置输出模式,必须了解有状态流式处理、水印和触发器。 请参阅以下文章:
什么是输出模式?
结构化流式处理查询的输出模式决定了查询的操作符在每次触发期间发出哪些记录。 可发出下面三种类型的记录:
- 将来处理时不会更改的记录。
- 自上次触发器以来已更改的记录。
- 状态表中的所有记录。
知晓要发出哪些类型的记录对于有状态运算符很重要,因为有状态运算符生成的特定行可能会因触发器而异。 例如,当流式聚合运算符接收到一个特定窗口的多行数据时,该窗口的聚合值可能在多次触发中发生变化。
对于无状态运算符,记录类型之间的区别不会影响运算符的行为。 无状态运算符在触发器期间发出的记录始终是在该触发器期间处理的源记录。
可用输出模式
有三种输出模式可告知操作者在特定触发器期间需要发出的记录:
| 输出模式 | 说明 |
|---|---|
| 追加模式(默认) | 默认情况下,流式处理查询在追加模式下运行。 在此模式下,操作符仅发出在未来的触发中不会变化的行。 有状态运算符使用水印来确定何时发生这种情况。 |
| 更新模式 | 在更新模式下,运算符会发出在触发器期间更改的所有行,即使发出的记录可能会在后续触发器中更改。 |
| 完整模式 | 完整模式仅适用于流式处理聚合。 在完整模式下,运算符生成的所有结果行都会在下游发出。 |
生产注意事项
对于许多有状态流式处理操作,您需要选择追加模式或更新模式。 以下部分概述了可能会影响你做出决定的注意事项。
注意
完整模式有一些应用场景,但随着数据扩展,可能会表现不佳。 Databricks 建议使用物化视图,从而在完整模式下为许多有状态操作提供增量处理的语义保证。 请参阅 具体化视图。
应用语义
应用语义描述了下游应用程序如何使用流数据。
如果下游服务需要对每个下游写入进行单独操作,则在大多数情况下使用附加模式。 例如,如果有下游通知服务为写入接收器的每个新记录发送通知,则追加模式可确保每个记录只写入一次。 每次状态信息更改时,更新模式都会写入记录,这将导致大量更新。
如果下游服务需要最新结果,更新模式可确保数据接收端尽可能保持最新状态。 示例包括实时读取特征的机器学习模型或跟踪实时聚合的分析仪表板。
运算符和接收器兼容性
结构化流式处理不支持 Apache Spark 中提供的所有操作,并且某些流式处理操作在所有输出模式下都不受支持。 有关运算符限制的详细信息,请参阅 OSS 流式处理文档。
并非所有接收器都支持所有输出模式。 Kafka 支持所有输出模式。 支持所有 Unity 目录托管表的 Delta Lake 支持追加和完整模式,但不支持更新模式。 有关与 Delta Lake 接收器的更新模式类似的行为,请参阅 流式处理中的合并。
有关接收器兼容性的详细信息,请参阅 OSS 流式处理文档。
延迟和成本
输出模式会影响写入记录之前必须花费的时间,写入数据的频率和写入量可能会影响与流式处理管道相关的成本。
追加模式强制有状态运算符仅在完成有状态结果后发出结果,延迟时长至少等于水印延迟。 在追加输出模式下水印延迟 1 hour 意味着记录在下游发出之前至少有 1 小时的延迟。
更新模式会导致每个触发器对每个聚合值执行一次写入。 如果接收端对每条记录的每次写入收费,那么如果记录在水印延迟时间结束之前多次更新,此操作可能会非常昂贵。
配置示例
以下代码示例演示如何配置将更新流式传输到 Unity Catalog 表的输出模式:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala(编程语言)
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
有关 PySpark DataStreamWriter.outputMode 或 Scala DataStreamWriter.outputMode,请参阅 OSS 文档。
有状态流式传输和输出模式示例
以下示例旨在帮助你推理和理解输出模式如何与有状态流式处理的水印交互。
考虑一个流式聚合,其作用是在水印延迟为15分钟的情况下,计算商店每小时产生的总收入。 第一个微批处理以下记录:
- 下午 2:40,15 美元
- 下午2:30 10美元
- 下午 3:10 30 美元
此时,引擎的水印为 2:55pm,因为它从看到的最大时间 (3:10pm) 中减去 15 分钟(延迟)。 流式处理聚合运算符的状态如下:
-
[2pm, 3pm]:25 美元 -
[3pm, 4pm]:30 美元
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式处理聚合运算符不会向下游发送任何数据。 这是因为这两个窗口可能会随着后续触发器出现新值而更改:下午 2:55 的水印表示下午 2:55 之后的记录可能仍会到达,并且这些记录可能在 [2pm, 3pm] 时窗或 [3pm, 4pm] 时窗范围内。 |
| 更新 | 运算符发出这两条记录,因为它们都收到了更新。 |
| 完成 | 运算符输出所有记录。 |
现在,假设流又接收了一条记录:
- 下午3:20 20美元
水印更新为下午 3:05,因为引擎从下午 3:20 中减去 15 分钟。 此时,流式处理聚合运算符的状态如下:
-
[2pm, 3pm]:25 美元 -
[3pm, 4pm]:50 美元
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式聚合运算符观察到下午 3:05 的水印已经超出了 [2pm, 3pm] 窗口的结束点。 根据水印的定义,该时窗无法再更改,因此它释放出[2pm, 3pm]窗口。 |
| 更新 | 流式聚合运算符发出 [3pm, 4pm] 窗口,因为状态值已从30变为50。 |
| 完成 | 运算符输出所有记录。 |
下面总结了有状态运算符在每个追加模式下的行为方式:
- 在追加模式下,在水印延迟后写入记录一次。
- 在更新模式下,写入自上一个触发器以来已更改的记录。
- 在完整模式下,写入由有状态运算符生成的所有记录。