通过


DataSourceStreamReader

流式处理数据源读取器的基类。

数据源流读取器负责从流式处理数据源输出数据。 实现此类并返回一个实例 DataSource.streamReader() ,使数据源可读为流式处理源。

Syntax

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

方法

方法 说明
initialOffset() 以 .. 的形式 dict返回流式处理数据源的初始偏移量。 新的流式处理查询将从此偏移量开始读取。 必须以 JSON 或 dict 格式返回基元类型的偏移键值对。 如果未实现,则 PySparkNotImplementedError 引发。
latestOffset(start, limit) 返回以起始偏移量和读取限制的形式 dict提供的最新偏移量。 源可能会返回与没有新数据相同的偏移量 start 。 源必须始终尊重给定 limit的 。 必须以 JSON 或 dict 格式返回基元类型的偏移键值对。 如果未实现,则 PySparkNotImplementedError 引发。
partitions(start, end) 返回表示两者之间InputPartitionstart和偏移量的数据的对象end序列。 如果 start 等于 end,则返回空序列。 每个 InputPartition 表示可由一个 Spark 任务处理的数据拆分。
read(partition) 为给定分区生成数据,并返回元组、行或 PyArrow RecordBatch 对象的迭代器。 每个元组或行都转换为最终数据帧中的一行。 此方法是抽象的,必须实现。
commit(end) 通知源 Spark 已完成处理偏移量小于或等于 end的所有数据。 Spark 将仅请求大于 end 将来的偏移量。
stop() 停止源并释放已分配的任何资源。 流式处理查询终止时调用。

备注

  • read() 是静态的和无状态的。 不要访问可变类成员或在不同的调用 read()之间保留内存中状态。
  • 返回 partitions() 的所有分区值都必须是可选取的对象。
  • 偏移量表示为 dict 或递归 dict ,其键和值是基元类型:整数、字符串或布尔值。

示例

实现从索引记录序列中读取的流式读取器:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")