Condividi tramite


SimpleDataSourceStreamReader

Classe di base per i lettori semplificati dell'origine dati di streaming.

Rispetto a DataSourceStreamReader, SimpleDataSourceStreamReader non richiede la pianificazione delle partizioni di dati. Il read() metodo consente la lettura dei dati e la pianificazione dell'offset più recente contemporaneamente.

Poiché SimpleDataSourceStreamReader legge i record nel driver Spark per determinare l'offset finale di ogni batch senza partizionamento, è adatto solo per i casi d'uso leggeri in cui la velocità di input e le dimensioni del batch sono ridotte. Usare DataSourceStreamReader quando la velocità effettiva di lettura è elevata e non può essere gestita da un singolo processo.

Aggiunta in Databricks Runtime 15.3

Sintassi

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

metodo Descrizione
initialOffset() Restituisce l'offset iniziale dell'origine dati di streaming. Una nuova query di streaming inizia a leggere da questo offset.
read(start) Legge tutti i dati disponibili dall'offset iniziale e restituisce una tupla di un iteratore di record e l'offset finale per il tentativo di lettura successivo.
readBetweenOffsets(start, end) Legge tutti i dati disponibili tra offset di inizio e fine specifici. Richiamato durante il ripristino degli errori per rileggere un batch in modo deterministico.
commit(end) Informa l'origine che Spark ha completato l'elaborazione di tutti i dati per gli offset minori o uguali a end.

Examples

Definire un lettore di origine dati di streaming semplificato personalizzato:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()