本页提供结构化流式处理中实时模式的参考信息,包括支持的环境、语言、源、接收器和运算符。 有关已知限制,请参阅 实时模式限制。
支持的语言
实时模式支持 Scala、Java 和 Python。
计算类型
实时模式支持以下计算类型:
| 计算类型 |
支持 |
| 专用(以前:单个用户) |
✓ |
| 标准(前:共享) |
• (仅Python) |
| Lakeflow Spark 声明性管道 Classic版本 |
不支持 |
| Lakeflow Spark 声明性管道无服务器 |
不支持 |
| Serverless |
不支持 |
执行模式
实时模式仅支持更新模式:
| 执行模式 |
支持 |
| 更新模式 |
✓ |
| 追加模式 |
不支持 |
| 完整模式 |
不支持 |
源和汇
实时模式支持以下源和接收器:
| 源或汇 |
作为源 |
作为接收器 |
| Apache Kafka |
✓ |
✓ |
| 事件中心(使用 Kafka 连接器) |
✓ |
✓ |
| 动动力 |
• (仅 EFO 模式) |
不支持 |
| AWS MSK |
✓ |
不支持 |
| Delta |
不支持 |
不支持 |
| Google Pub/Sub(谷歌发布/订阅消息服务) |
不支持 |
不支持 |
| Apache Pulsar |
不支持 |
不支持 |
任意接收端(使用 forEachWriter) |
不適用 |
✓ |
运营商
实时模式支持大多数结构化流式处理运算符:
无状态操作
UDFs
| Operator |
支持 |
| Scala 用户定义函数 (UDF) |
• (有一些限制) |
| Python UDF |
• (有一些限制) |
集合体
| Operator |
支持 |
| sum |
✓ |
| 计数 |
✓ |
| 最大值 |
✓ |
| 分钟 |
✓ |
| avg |
✓ |
|
聚合函数 |
✓ |
Windowing
| Operator |
支持 |
| Tumbling |
✓ |
| Sliding |
✓ |
| 会期 |
不支持 |
去重
| Operator |
支持 |
| dropDuplicates |
✓(状态无界) |
| 水印范围内删除重复项 |
不支持 |
流式传输到表联接
| Operator |
支持 |
| 广播表联接(表应较小) |
✓ |
| 流到流联接 |
不支持 |
| (平)MapGroupsWithState |
不支持 |
| transformWithState |
• (有一些差异) |
| 并 |
• (有一些限制) |
| forEach |
✓ |
| forEachBatch |
不支持 |
| mapPartitions (映射分区) |
不支持(请参阅限制) |
特殊注意事项
某些运算符和功能在实时模式下使用时具有特定的注意事项或差异。
为了构建自定义有状态应用程序,Databricks 支持 Apache Spark Structured Streaming 中的 transformWithState API。 有关 API 和代码片段的详细信息,请参阅 生成自定义有状态应用程序 。
但是,API 在实时模式下的行为方式和利用微批处理体系结构的传统流式处理查询之间存在一些差异。
- 实时模式为每一行调用
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)方法。
-
inputRows迭代器返回单个值。
微批处理模式为每个密钥调用一次, inputRows 迭代器返回微批处理中某个键的所有值。
- 编写代码时考虑此差异
- 实时模式下不支持事件时间计时器。
- 在实时模式下,计时器因数据到达而延迟触发:
- 如果计时器设置为10:00:00,但没有数据到达,计时器不会立刻触发。
- 如果数据到达 10:00:10,计时器会以 10 秒的延迟触发。
- 如果没有数据到达并且长时间运行的批处理正在终止,计时器会在批处理终止之前触发。
Python UDF 实时模式
Databricks 在实时模式下支持大多数Python用户定义的函数(UDF):
无状态
| UDF 类型 |
支持 |
| Python标量 UDF (用户定义的标量函数 - Python) |
✓ |
| 箭头标量 UDF |
✓ |
| Pandas 标量 UDF(Pandas 用户定义函数) |
✓ |
箭头函数 (mapInArrow) |
✓ |
| Pandas 函数 (地图) |
✓ |
有状态分组(用户定义的聚合函数)
| UDF 类型 |
支持 |
transformWithState (仅 Row 接口) |
✓ |
applyInPandasWithState |
不支持 |
非有状态分组(UDAF)
| UDF 类型 |
支持 |
apply |
不支持 |
applyInArrow |
不支持 |
applyInPandas |
不支持 |
表函数
在实时模式下使用 Python UDF 时,需要考虑以下几点:
- 若要最大程度地减少延迟,请将箭头批大小 (
spark.sql.execution.arrow.maxRecordsPerBatch) 配置为 1。
- 权衡:此配置以牺牲吞吐量为代价优化延迟。 对于大多数工作负荷,建议使用此设置。
- 仅在需要更高的吞吐量来容纳输入量时才增加批处理大小,并接受可能的延迟增加。
- Pandas UDF 和函数在 Arrow 批大小为 1 时性能不佳。
- 如果使用 pandas UDF 或函数,请将箭头批大小设置为更高的值(例如 100 或更高)。
- 这意味着更高的延迟。 Databricks 建议尽可能使用箭头 UDF 或函数。
- 由于 pandas 的性能问题,transformWithState 仅支持
Row 接口。