实时模式支持超低延迟流式处理,端到端延迟低至 5 毫秒,因此非常适合欺诈检测和实时个性化等作工作负荷。 本教程指导你使用一个简单的示例设置第一个实时流式处理查询。
有关实时模式的概念性信息、何时使用它和支持的功能,请参阅 结构化流式处理中的实时模式。 有关配置要求,请参阅 “设置实时模式”。
要求
在开始之前,请确保有权创建使用 设置实时模式中指定的配置的经典计算群集。 或者,请与工作区管理员联系,为你创建实时模式群集。
步骤 1:创建笔记本
笔记本提供用于开发和测试流式处理查询的交互式环境。 你可以使用此笔记本来编写实时查询,并观察结果的持续更新。
创建笔记本:
- 在边栏中单击“ 新建 ”,然后单击“
笔记本。
- 在计算下拉菜单中,选择实时模式群集。
- 选择 Python 或 Scala作为默认语言。
步骤 2:运行实时模式查询
将以下代码复制并粘贴到笔记本单元格中,然后运行它。 此示例使用速率源,该源以指定速率生成行,并实时显示结果。
注释
display具有realTime触发器的函数在 Databricks Runtime 17.1 及更高版本中可用。
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
运行代码后,会看到生成新行时实时更新的表。 该表显示一列 timestamp 和一列 value ,该列随每行一起递增。
了解数据
上面的代码演示实时流式处理查询的基本组件。 下表说明了关键参数及其控制内容:
Python
| 参数 | 说明 |
|---|---|
format("rate") |
使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。 |
numPartitions |
设置生成的数据的分区数。 |
rowsPerSecond |
控制每秒生成多少行。 |
realTime="5 minutes" |
启用实时模式。 间隔指定查询检查点的进度频率。 较长的间隔意味着检查点频率较低,但在发生故障后可能更长的恢复时间。 |
outputMode="update" |
实时模式需要更新输出模式。 |
Scala
| 参数 | 说明 |
|---|---|
format("rate") |
使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。 |
numPartitions |
设置生成的数据的分区数。 |
rowsPerSecond |
控制每秒生成多少行。 |
Trigger.RealTime() |
使用默认的检查点间隔启用实时模式。 还可以指定间隔,例如 Trigger.RealTime("5 minutes")。 |
OutputMode.Update() |
实时模式需要更新输出模式。 |
步骤 3:验证结果
运行查询时,该 display 函数将创建一个表,该表会在速率源生成新行时实时更新。 每行都包含:
- 速率源生成该行时的时间戳。
- 单调递增的计数器,随每个新行一起递增。
表会持续更新,延迟最小,演示实时模式在数据可用后如何处理数据。 这是实时模式的核心优势 - 能够立即查看和处理数据,而不是等待批处理。
其他资源
运行第一个实时查询后,请浏览这些资源,以使用 Kafka、Kinesis 和其他受支持的源生成生产流式处理应用程序: