通过


结构化流式处理的生产注意事项

本页包含通过作业在 Azure Databricks 上调度结构化流式处理工作负荷的建议。

Databricks 建议始终执行以下操作:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 不要使用通用计算运行结构化流式处理工作负载。 始终使用作业计算将流作为作业来计划。
  • 使用 Continuous 模式计划作业。 这指的是Azure Databricks作业计划功能,而不是结构化流式处理触发器间隔
  • 不要为结构化流式处理作业的计算启用自动缩放。

某些工作负载会受益于以下功能:

Azure Databricks引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道

注意

计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率

:::note 无服务器计算

在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()

对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。

请参阅 流式处理限制

:::

设计流式处理工作负载来应对失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)假定结构化流式处理工作负载配置为自动重试。 请参阅如何配置结构化流式处理作业以在失败时重启流式查询

有些操作(例如 foreachBatch)提供至少一次(而不是恰好一次)保证。 对于这些操作,应确保处理管道是幂等的。 请参阅使用 foreachBatch 将内容写入到任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果您的作业由于内存不足错误导致失败,或者您因微批次过大而手动取消作业,则可能需要升级计算资源,以便成功处理微批次。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业何时重试?

可以将多个任务安排为Azure Databricks作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可选择使用以下方法之一在单个作业中计划多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略: 多个任务 多个查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的作业计算。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择性地将查询分配给计划程序池
如何处理重试? 在作业重试之前,所有任务都必须失败。 如果任何查询失败,任务将会重试。

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议使用连续触发器配置所有流式处理工作负载。 请参阅连续运行作业

连续触发器默认提供以下行为:

  • 防止作业同时多次运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

Databricks 建议不要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 这些函数不需要包含在代码中,因为 Jobs 服务会在流式查询处于活动状态时自动阻止运行完成。 这两个函数都阻止笔记本单元完成并阻止作业服务跟踪流式处理查询。 此外,流处理积压指标和通知不发送到作业服务,这会中断作业通知。

将计划程序池用于多个流式处理查询

可以通过配置调度池,在从同一源代码运行多个流式查询时,将计算能力分配给查询。

默认情况下,笔记本中启动的所有查询都在同一个公平调度池中运行。 由触发器根据笔记本中的所有流式处理查询生成的 Apache Spark 作业将按照先入先出 (FIFO) 的顺序逐一运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。

计划程序池允许您声明哪些结构化流式处理查询共享计算资源。

以下示例将 query1 分配给专用池,而 query2query3 共享一个计划程序池。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关更多详细信息,请参阅 Apache 公平计划程序文档