通过


实时模式参考

本页提供结构化流式处理中实时模式的参考信息,包括支持的环境、语言、源、接收器和运算符。 有关已知限制,请参阅 实时模式限制

支持的语言

实时模式支持 Scala、Java 和 Python。

计算类型

实时模式支持以下计算类型:

计算类型 支持
专用(以前:单个用户)
标准(前:共享) • (仅Python)
Lakeflow Spark 声明性管道 Classic版本 不支持
Lakeflow Spark 声明性管道无服务器 不支持
Serverless 不支持

执行模式

实时模式仅支持更新模式:

执行模式 支持
更新模式
追加模式 不支持
完整模式 不支持

源和汇

实时模式支持以下源和接收器:

源或汇 作为源 作为接收器
Apache Kafka
事件中心(使用 Kafka 连接器)
动动力 • (仅 EFO 模式) 不支持
AWS MSK 不支持
Delta 不支持 不支持
Google Pub/Sub(谷歌发布/订阅消息服务) 不支持 不支持
Apache Pulsar 不支持 不支持
任意接收端(使用 forEachWriter 不適用

运营商

实时模式支持大多数结构化流式处理运算符:

无状态操作

Operator 支持
选择
投影

UDFs

Operator 支持
Scala 用户定义函数 (UDF) • (有一些限制
Python UDF • (有一些限制

集合体

Operator 支持
sum
计数
最大值
分钟
avg
聚合函数

Windowing

Operator 支持
Tumbling
Sliding
会期 不支持

去重

Operator 支持
dropDuplicates ✓(状态无界)
水印范围内删除重复项 不支持

流式传输到表联接

Operator 支持
广播表联接(表应较小)
流到流联接 不支持
(平)MapGroupsWithState 不支持
transformWithState • (有一些差异
• (有一些限制
forEach
forEachBatch 不支持
mapPartitions (映射分区) 不支持(请参阅限制

特殊注意事项

某些运算符和功能在实时模式下使用时具有特定的注意事项或差异。

transformWithState 实时模式

为了构建自定义有状态应用程序,Databricks 支持 Apache Spark Structured Streaming 中的 transformWithState API。 有关 API 和代码片段的详细信息,请参阅 生成自定义有状态应用程序

但是,API 在实时模式下的行为方式和利用微批处理体系结构的传统流式处理查询之间存在一些差异。

  • 实时模式为每一行调用handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)方法。
    • inputRows迭代器返回单个值。 微批处理模式为每个密钥调用一次, inputRows 迭代器返回微批处理中某个键的所有值。
    • 编写代码时考虑此差异
  • 实时模式下不支持事件时间计时器。
  • 在实时模式下,计时器因数据到达而延迟触发:
    • 如果计时器设置为10:00:00,但没有数据到达,计时器不会立刻触发。
    • 如果数据到达 10:00:10,计时器会以 10 秒的延迟触发。
    • 如果没有数据到达并且长时间运行的批处理正在终止,计时器会在批处理终止之前触发。

Python UDF 实时模式

Databricks 在实时模式下支持大多数Python用户定义的函数(UDF):

无状态

UDF 类型 支持
Python标量 UDF (用户定义的标量函数 - Python
箭头标量 UDF
Pandas 标量 UDF(Pandas 用户定义函数)
箭头函数 (mapInArrow
Pandas 函数 (地图

有状态分组(用户定义的聚合函数)

UDF 类型 支持
transformWithState (仅 Row 接口)
applyInPandasWithState 不支持

非有状态分组(UDAF)

UDF 类型 支持
apply 不支持
applyInArrow 不支持
applyInPandas 不支持

表函数

UDF 类型 支持
UDTF (Python用户定义的表函数 (UDDF) 不支持
UC UDF 不支持

在实时模式下使用 Python UDF 时,需要考虑以下几点:

  • 若要最大程度地减少延迟,请将箭头批大小 (spark.sql.execution.arrow.maxRecordsPerBatch) 配置为 1。
    • 权衡:此配置以牺牲吞吐量为代价优化延迟。 对于大多数工作负荷,建议使用此设置。
    • 仅在需要更高的吞吐量来容纳输入量时才增加批处理大小,并接受可能的延迟增加。
  • Pandas UDF 和函数在 Arrow 批大小为 1 时性能不佳。
    • 如果使用 pandas UDF 或函数,请将箭头批大小设置为更高的值(例如 100 或更高)。
    • 这意味着更高的延迟。 Databricks 建议尽可能使用箭头 UDF 或函数。
  • 由于 pandas 的性能问题,transformWithState 仅支持 Row 接口。