通过


设置实时模式

本页介绍在结构化流式处理中运行实时模式查询所需的先决条件和配置。 有关分步教程,请参阅 教程:运行实时流式处理工作负荷。 有关实时模式的概念性信息,请参阅 结构化流式处理中的实时模式

先决条件

若要使用实时模式,必须将计算配置为满足以下要求:

  • 在经典计算中使用专用访问模式。 不支持标准访问模式、Lakeflow Spark 声明性管道和无服务器群集。
  • 使用 Databricks Runtime 16.4 LTS 及更高版本。
  • 关闭自动缩放。
  • 关闭 Photon。
  • spark.databricks.streaming.realTimeMode.enabled 设置为 true
  • 关闭spot实例以防止中断。

有关创建和配置经典计算的说明,请参阅 计算配置参考

查询配置

若要以实时模式运行查询,必须启用实时触发器。 实时触发器仅在更新模式下受支持。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

计算资源评估

如果计算资源有足够的任务槽,则每个计算资源可以运行一个实时作业。

若要在低延迟模式下运行,可用任务槽总数必须大于或等于所有查询阶段的任务数。

槽计算示例

管道类型 配置 所需槽位
单阶段无状态 (Kafka 源 + 接收器) maxPartitions = 8 8 个槽
两阶段有状态 (Kafka 源 + 随机) maxPartitions = 8,随机分区 = 20 28 个槽(8 + 20)
三阶段 (Kafka 源 + 洗牌 + 重新分区) maxPartitions = 8,两个随机阶段,每个阶段为 20 个 48 个插槽(8 + 20 + 20)

如果未设置 maxPartitions,请使用 Kafka 主题中的分区数。

其他资源