通过


Delta 表流式读取和写入

本页介绍如何将 Delta 表用作 Spark 结构化流式处理 的源和 readStream 以及 writeStream 的汇。 Delta Lake 解决了流系统和文件的常见性能和可靠性问题。 其优势包括:

  • 合并低延迟摄取生成的小型文件,提高性能。
  • 使用多个流(或并发批处理作业)实现"精确一次"处理。
  • 使用文件作为流源时有效地发现新文件。

若要了解如何在 Databricks SQL 中使用流式处理表加载数据,请参阅 Databricks SQL 中使用流式处理表

有关 Delta Lake 的流静态联接,请参阅 stream-static joins

将 Delta 表用作接收器

可以使用结构化流式处理将数据写入 Delta 表。 Delta Lake 事务日志保证精确一次处理,即使有其他流或批处理查询同时针对表运行。

使用结构化流处理接收器写入 Delta 表时,可能会看到带有 epochId = -1 的空提交。 这些是预期的,通常会发生:

  • 在每次流式处理查询运行的第一批(这将在每个批处理中发生Trigger.AvailableNow)。
  • 更改架构时(例如添加列)。

这些空提交是有意的,并不指示错误。 它们不会以任何重要方式影响查询的正确性或性能。

Note

Delta Lake VACUUM 函数会删除所有不由 Delta Lake 管理的文件,但会跳过所有以 _ 开头的目录。 可以使用 <table-name>/_checkpoints 等目录结构将检查点与 Delta 表的其他数据和元数据一起安全地存储。

使用指标监视积压工作

使用以下指标监控 流式查询进程的积压情况:

  • numBytesOutstanding:积压工作中尚未处理的字节数。
  • numFilesOutstanding:积压工作中尚未处理的文件数。
  • numNewListedFiles:列出用于计算此批积压的 Delta Lake 文件数。
  • backlogEndOffset:用于计算积压工作的 Delta 表版本。

在笔记本中,在流式处理查询进度仪表板的 “原始数据 ”选项卡下查看这些指标:

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

追加模式

默认情况下,数据流在追加模式下运行,并且只向表中添加新记录。

使用toTable方法来流式传输到表:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

完整模式

使用具有完整模式的结构化流式处理在每个批处理后替换整个表。 例如,可以按客户持续更新聚合的汇总事件表:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

对于没有严格延迟要求的应用程序,可以使用一次性触发器(例如 AvailableNow)节省计算资源和成本。 例如,使用此触发器按给定计划更新摘要聚合表,只处理自上次更新以来到达的新数据。 请参阅 AvailableNow:增量批处理

处理源 Delta 表的改动

结构化流式处理以增量方式读取 Delta 表。 当流式查询从 Delta 表读取时,随着新表版本提交到源表,新记录将以幂等方式进行处理。 结构化流式处理仅接受追加输入,如果源 Delta 表发生任何修改,则会引发异常。 例如,如果某个UPDATEDELETEMERGE INTOOVERWRITE操作修改流式处理查询读取的源 Delta 表,则流将失败并显示错误。

根据用例,有四种典型的方法可用于处理源 Delta 表的上游更改。 下面提供了一个参考表以及每项的详细信息。

方法 Pros 缺点
skipChangeCommits 简单,不需要编写复杂的逻辑。 有用的仅追加处理用于单独处理上游变化,或暂时处理错误的记录。 不传递更改,只处理追加。
完全刷新 同样简单,不需要编写复杂的逻辑。 对于具有极少数上游更改的小型数据集非常有用。 大型数据集的成本高昂。 需要重新处理所有下游表。
更改数据流 处理所有更改类型(插入、更新和删除)。 Databricks 建议尽可能从 Delta 表的 CDC 源进行流式处理,而不是直接从表中流式传输。 需要编写更复杂的逻辑来处理每个更改类型。
实例化视图 具有自动更改传播的结构化流式处理简单替代方法。 更高的延迟。 仅在 Lakeflow Spark 声明性管道和 Databricks SQL 中可用。

跳过上游更改提交 skipChangeCommits

设置为 skipChangeCommits 忽略删除或修改现有记录的事务,并仅处理追加。 当不需要通过流传播对现有数据的更改,或者首选单独的逻辑来处理这些更改时,这非常有用。 如果需要暂时忽略一次性更改,可以打开和关闭 skipChangeCommits

Databricks 建议对不使用更改数据馈送的大多数工作负荷使用 skipChangeCommits

Python

(spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")
)

Scala

spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")

Important

如果在开始针对表进行流式读取后 Delta 表的架构发生更改,查询将会失败。 对于大多数架构更改,可以重启流以解决架构不匹配问题并继续处理。

在 Databricks Runtime 12.2 LTS 及更低版本中,无法从启用了列映射且经历了非累加架构演变(例如重命名或删除列)的 Delta 表进行流式处理。 有关详细信息,请参阅 列映射和流式处理

Note

在 Databricks Runtime 12.2 LTS 及更高版本中, skipChangeCommits 替换 ignoreChanges。 在 Databricks Runtime 11.3 LTS 及更低版本中,ignoreChanges 是唯一受支持的选项。 请查看 旧选项 ignoreChanges 以获取详细信息。

旧选项:ignoreDeletes

ignoreDeletes 是一个旧选项,它仅处理在分区边界处删除数据的事务(即完全分区删除)。 如果需要处理非分区删除、更新或其他修改,请改用 skipChangeCommits

Python
(spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")
)
Scala
spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

旧选项: ignoreChanges

ignoreChanges 在 Databricks Runtime 11.3 LTS 和更低版本中提供。 在 Databricks Runtime 12.2 LTS 及更高版本中,它将替换为 skipChangeCommits

在启用 ignoreChanges 后,当发生数据修改操作(例如 UPDATEMERGE INTO(在 DELETE 分区内)或 OVERWRITE)之后,将重新发出源表中重写的数据文件。 未更改的行通常与新行一起发布,因此下游数据使用者必须能够处理重复项。 删除不会传播到下游。 ignoreChanges 优先于 ignoreDeletes

相比之下, skipChangeCommits 完全忽略更改文件的操作。 由于数据修改操作(例如 UPDATEMERGE INTODELETEOVERWRITE),源表中的数据文件会被重写,但随之而来的更改将被完全忽略。 若要反映流源表中的更改,必须实现单独的逻辑来传播这些更改。

Databricks 建议对所有新工作负荷使用 skipChangeCommits 。 若要从 ignoreChangesskipChangeCommits迁移工作负荷,请重构流式处理逻辑。

下游表的完全刷新

如果上游更改稀少,并且数据量小到足以重新处理,可以删除流处理检查点和输出表,然后从头开始重新启动数据流。 这会导致流重新处理源表中的所有数据。 请注意,此方法还需要重新处理依赖于此流的输出的所有下游表。

此方法最适合较小的数据集或工作负荷,其中上游更改不频繁,并且完全刷新的成本是可接受的。

使用更改数据馈送

对于处理所有类型更改(插入、更新和删除)的工作负载,请使用 Delta Lake 更改数据流。 更改数据馈送记录对 Delta 表的行级更改,使你可以流式传输这些更改,并编写逻辑来处理下游表中的每个更改类型。 这是最可靠的方法,因为代码显式处理每种类型的更改事件。 请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据流

如果使用 Lakeflow Spark 声明性管道,请参阅 AUTO CDC API:使用管道简化更改数据捕获

Important

在 Databricks Runtime 12.2 LTS 及以下版本中,无法从启用了列映射且经历过非追加性架构演变的 Delta 表进行更改数据馈送的流式传输,例如重命名或删除列。 请参阅 列映射和流式处理

使用具体化视图

具体化视图通过在源数据更改时重新计算结果自动处理上游更改。 如果不需要尽可能低的延迟并且想要避免管理流式处理复杂性,具体化视图可以简化体系结构。 具体化视图在 Lakeflow Spark 声明性管道管道和 Databricks SQL 中可用。 请参阅 具体化视图

Example

例如,假设你有一个表 user_events,其中包含 dateuser_emailaction 列,并按 date 对该表进行了分区。 从 user_events 表向外进行流式处理,由于 GDPR 的原因,需要从中删除数据。

skipChangeCommits允许您删除多个分区中的数据(在此示例中,按照user_email进行筛选)。 使用以下语法:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

如果使用 user_email 语句更新 UPDATE,则包含相关 user_email 的文件将被重写。 使用 skipChangeCommits 忽略更改的数据文件。

Databricks 建议使用 skipChangeCommits ,除非您确定删除始终是完整分区删除,否则不推荐使用 ignoreDeletes

请使用 foreachBatch 进行幂等表写入

Note

Databricks 建议为每个需要更新的输出目标配置单独的流式写入,而不是使用 foreachBatch。 向多个接收器写入 foreachBatch 会减少并行化并增加整体延迟,因为向多个表写入在 foreachBatch 中是按序列进行的。

Delta 表支持以下 DataFrameWriter 选项,以使对多个 foreachBatch 表的写入具备幂等性:

  • txnAppId:可以在每次 DataFrame 写入时传递的唯一字符串。 例如,可以使用 StreamingQuery ID 作为 txnAppIdtxnAppId 可以是任何用户生成的唯一字符串,不必与流 ID 相关。
  • txnVersion:充当事务版本的单调递增数字。

Delta Lake 使用 txnAppIdtxnVersion 来识别并忽略重复写入。 例如,在失败中断批处理写入后,可以使用相同的txnAppIdtxnVersion重新运行批处理,以正确识别并忽略重复项。 请参阅使用 foreachBatch 将内容写入到任意数据接收器

Warning

如果删除流式处理检查点并使用新检查点重新启动查询,则必须提供其他 txnAppId。 新检查点以 0 的批 ID 开头。 Delta Lake 使用批 ID 和 txnAppId 作为唯一键,并跳过已见值的批次。

以下代码示例演示了此模式:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}

使用 foreachBatch 从流式处理查询中更新插入

可以使用mergeforeachBatch将流式查询的复杂上插写入 Delta 表。 请参阅使用 foreachBatch 将内容写入到任意数据接收器

此方法有许多应用程序:

Note

  • 验证foreachBatch中的merge语句是否幂等。 否则,重新启动流式查询时可能对同一批数据多次执行操作。 请参阅 “用于 foreachBatch 幂等表写入”。

  • 使用mergeforeachBatch时,输入数据速率指标可能会返回一个是源头数据生成速率倍数的结果。 merge 多次读取输入数据,这将增加度量指标。 若要防止指标乘法,请在之前 merge 缓存批处理数据帧,然后在之后 merge将其取消缓存。

    可通过 StreamingQueryProgress 和笔记本流式处理速率图获取输入数据速率。 请参阅 Azure Databricks 上的结构化流式查询监控

例如,您可以在MERGEforeachBatch中使用SQL语句:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

还可以使用 Delta Lake APIs 进行流式插入和更新:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

设置初始表版本以处理更改

默认情况下,数据流从最新的可用 Delta 表版本开始。 这包括当时表的完整快照以及所有将来的更改。 Databricks 建议对大多数工作负荷使用默认的初始表版本。

可以选择使用以下选项来指定 Delta Lake 流式处理源的开始位置,而无需处理整个表。

  • startingVersion:要从中读取的 Delta 表版本。 在指定版本或之后,流会读取所有已提交的表更改。 如果指定的版本不可用,则流无法启动。

    若要查找可用的提交版本,请运行DESCRIBE HISTORY并检查version。 若仅返回最新更改,请指定 latest。 有关 Delta 表的版本信息,请参阅 使用表历史记录

  • startingTimestamp:开始读取的时间戳。 数据流会读取在指定时间戳或之后提交的所有表更改。 如果提供的时间戳位于所有表提交之前,则流式读取按最早的可用时间戳开始。 设置以下任一项:

    • 时间戳字符串。 例如 "2019-01-01T00:00:00.000Z"
    • 日期字符串。 例如 "2019-01-01"

不能同时设置startingVersionstartingTimestamp。 这些设置仅适用于新的流式处理查询。 如果流式处理查询已启动且进度已记录在其检查点中,则忽略这些设置。

Important

虽然可以从指定的版本或时间戳启动流式处理源,但流式处理源的架构始终是 Delta 表的最新架构。 必须确保在指定版本或时间戳之后,不对 Delta 表进行任何不兼容的架构更改。 否则,在读取架构不正确的数据时,流式处理源可能会返回不正确的结果。

Example

例如,假设你有一个表 user_events。 如果要从版本 5 开始读取更改,请使用:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

如果想了解自 2018 年 10 月 18 日以来进行的更改,可使用:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

处理初始快照而不删除数据

此功能在 Databricks Runtime 11.3 LTS 及更高版本上可用。

在有状态流式查询中,当水印已定义时,通过修改时间排序处理文件可能导致记录处理顺序错误。 这可能会导致水印错误地将记录标记为后期事件并删除它们。 仅当按默认顺序处理初始 Delta 快照时,才会发生这种情况。

对于具有 Delta 源表的流,查询首先处理表中存在的所有数据,并创建名为 初始快照的版本。 默认情况下,Delta 表的数据文件将基于上次修改的文件进行处理。 但是,上次修改时间不一定表示记录事件时间顺序。

若要避免在初始快照处理期间删除数据,请启用 withEventTimeOrder 该选项。 withEventTimeOrder 将初始快照数据的事件时间范围划分为时间存储桶。 每个微批处理通过筛选时间范围内的数据来处理存储桶。 maxFilesPerTriggermaxBytesPerTrigger 选项仍适用于控制微批大小,但由于处理方法,这仅能大致控制。

下图显示了此过程:

初始快照

限制条件

  • 如果流查询已启动,并且初始快照正在积极处理,则无法更改 withEventTimeOrder 。 为了应用withEventTimeOrder的更改重新启动,必须删除检查点。
  • 如果 withEventTimeOrder 已启用,则无法在初始快照处理完成之前将流降级到不支持此功能的 Databricks Runtime 版本。 若要降级,请等待初始快照完成,或删除检查点并重启查询。
  • 在以下方案中不支持此功能:
    • 事件时间列是一个生成列,在 Delta 源和水印之间存在非投影转换过程。
    • 流查询中存在具有多个 Delta 源的水印。

性能

如果 withEventTimeOrder 已启用,初始快照处理性能可能会变慢。 每个微批处理扫描初始快照,以筛选相应事件时间范围内的数据。 提高筛选性能:

  • 使用 Delta 源列作为事件时间,以便应用数据跳过功能。 请参阅 数据跳过
  • 沿事件时间列对表进行分区。

使用 Spark UI 查看特定微批任务扫描的 Delta Lake 文件数量。

Example

假设你有一个 user_events 表,其中包含 event_time 列。 流式处理查询是一个聚合查询。 如果要确保在初始快照处理期间不会删除任何数据,可以使用:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

可以在群集上使用 Spark 配置进行设置withEventTimeOrder,以将其应用到所有流式处理查询: spark.databricks.delta.withEventTimeOrder.enabled true

限制输入速率以提高处理性能

默认情况下,结构化流式处理在每个微批处理中处理尽可能多的文件。 若要限制每批处理的数据量并管理内存使用量、稳定延迟或降低云存储成本,请使用以下选项:

  • maxFilesPerTrigger:每个微批中需要考虑的新文件数量。 默认值为 1000。
  • maxBytesPerTrigger:在每个微批中处理的数据量。 此选项设置一个“柔性最大值”,这意味着批处理大约处理此数量的数据,并且可能会超过此限制,以便在最小输入单元大于此限制的情况下,继续处理流式查询。 默认情况下,未设置此项。

如果同时使用 maxBytesPerTriggermaxFilesPerTrigger,则微批处理将处理数据,直到达到 maxFilesPerTriggermaxBytesPerTrigger 之一的限制。

Note

默认情况下,如果 logRetentionDuration 清理了源表中的事务,而流式查询尝试处理这些版本,则查询会失败,从而无法防止数据丢失。 可以将选项 failOnDataLoss 设置为 false,忽略丢失的数据并继续处理。 请参阅为“按时间顺序查看”查询配置数据保留

控制云存储成本

流式查询有几个可用的触发模式,可以用于平衡成本和延迟,包括processingTimeavailableNowrealTime。 请参阅 控制云存储成本