SimpleDataDataSourceStreamReader

Uma classe base para leitores simplificados de fontes de dados em streaming.

Comparado com DataSourceStreamReader, SimpleDataSourceStreamReader não requer planear partições de dados. O read() método permite ler dados e planear o deslocamento mais recente ao mesmo tempo.

Como SimpleDataSourceStreamReader lê registos no driver Spark para determinar o deslocamento final de cada lote sem particionamento, é adequado apenas para casos de uso leve onde a taxa de entrada e o tamanho do lote são pequenos. Use DataSourceStreamReader quando o débito de leitura for elevado e não pode ser gerido por um único processo.

Adicionado no Databricks Runtime 15.3

Sintaxe

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Método Descrição
initialOffset() Devolve o deslocamento inicial da fonte de dados em streaming. Uma nova consulta de streaming começa a ler a partir deste deslocamento.
read(start) Lê todos os dados disponíveis do offset inicial e devolve uma tupla de um iterador de registos e o deslocamento final para a próxima tentativa de leitura.
readBetweenOffsets(start, end) Lê todos os dados disponíveis entre os deslocamentos específicos de início e fim. Invocado durante a recuperação de falhas para reler um lote deterministicamente.
commit(end) Informa a fonte que o Spark completou o processamento de todos os dados para deslocamentos menores ou iguais a end.

Exemplos

Defina um leitor de fonte de dados simplificado personalizado:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()