通过


教程:运行实时流式处理工作负荷

实时模式支持超低延迟流式处理,端到端延迟低至 5 毫秒,因此非常适合欺诈检测和实时个性化等作工作负荷。 本教程指导你使用一个简单的示例设置第一个实时流式处理查询。

有关实时模式的概念性信息、何时使用它和支持的功能,请参阅 结构化流式处理中的实时模式。 有关配置要求,请参阅 “设置实时模式”。

要求

在开始之前,请确保有权创建使用 设置实时模式中指定的配置的经典计算群集。 或者,请与工作区管理员联系,为你创建实时模式群集。

步骤 1:创建笔记本

笔记本提供用于开发和测试流式处理查询的交互式环境。 你可以使用此笔记本来编写实时查询,并观察结果的持续更新。

创建笔记本:

  1. 在边栏中单击“ 新建 ”,然后单击“ 笔记本”图标。笔记本
  2. 在计算下拉菜单中,选择实时模式群集。
  3. 选择 PythonScala作为默认语言。

步骤 2:运行实时模式查询

将以下代码复制并粘贴到笔记本单元格中,然后运行它。 此示例使用速率源,该源以指定速率生成行,并实时显示结果。

注释

display具有realTime触发器的函数在 Databricks Runtime 17.1 及更高版本中可用。

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

运行代码后,会看到生成新行时实时更新的表。 该表显示一列 timestamp 和一列 value ,该列随每行一起递增。

了解数据

上面的代码演示实时流式处理查询的基本组件。 下表说明了关键参数及其控制内容:

Python

参数 说明
format("rate") 使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。
numPartitions 设置生成的数据的分区数。
rowsPerSecond 控制每秒生成多少行。
realTime="5 minutes" 启用实时模式。 间隔指定查询检查点的进度频率。 较长的间隔意味着检查点频率较低,但在发生故障后可能更长的恢复时间。
outputMode="update" 实时模式需要更新输出模式。

Scala

参数 说明
format("rate") 使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。
numPartitions 设置生成的数据的分区数。
rowsPerSecond 控制每秒生成多少行。
Trigger.RealTime() 使用默认的检查点间隔启用实时模式。 还可以指定间隔,例如 Trigger.RealTime("5 minutes")
OutputMode.Update() 实时模式需要更新输出模式。

步骤 3:验证结果

运行查询时,该 display 函数将创建一个表,该表会在速率源生成新行时实时更新。 每行都包含:

  • 速率源生成该行时的时间戳。
  • 单调递增的计数器,随每个新行一起递增。

表会持续更新,延迟最小,演示实时模式在数据可用后如何处理数据。 这是实时模式的核心优势 - 能够立即查看和处理数据,而不是等待批处理。

其他资源

运行第一个实时查询后,请浏览这些资源,以使用 Kafka、Kinesis 和其他受支持的源生成生产流式处理应用程序: