Condividi tramite


DataSourceStreamReader

Classe di base per i lettori dell'origine dati di streaming.

I lettori del flusso di origine dati sono responsabili dell'output dei dati da un'origine dati di streaming. Implementare questa classe e restituire un'istanza da DataSource.streamReader() per rendere leggibile un'origine dati come origine di streaming.

Sintassi

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

metodo Descrizione
initialOffset() Restituisce l'offset iniziale dell'origine dati di streaming come .dict Una nuova query di streaming inizia a leggere da questo offset. Deve restituire coppie chiave-valore di offset di tipi primitivi in formato o dict JSON. Genera PySparkNotImplementedError se non implementato.
latestOffset(start, limit) Restituisce l'offset più recente disponibile come dict, dato un offset iniziale e un limite di lettura. L'origine può restituire lo stesso offset di se start non sono presenti nuovi dati. L'origine deve sempre rispettare l'oggetto specificato limit. Deve restituire coppie chiave-valore di offset di tipi primitivi in formato o dict JSON. Genera PySparkNotImplementedError se non implementato.
partitions(start, end) Restituisce una sequenza di InputPartition oggetti che rappresentano i dati tra start e end gli offset. Restituisce una sequenza vuota se start è uguale a end. Ognuno InputPartition rappresenta una suddivisione dei dati che può essere elaborata da un'attività Spark.
read(partition) Genera dati per una determinata partizione e restituisce un iteratore di tuple, righe o oggetti PyArrow RecordBatch . Ogni tupla o riga viene convertita in una riga nel dataframe finale. Questo metodo è astratto e deve essere implementato.
commit(end) Informa l'origine che Spark ha completato l'elaborazione di tutti i dati per gli offset minori o uguali a end. Spark richiederà solo offset maggiori di quelli end futuri.
stop() Arresta l'origine e libera tutte le risorse allocate. Richiamato quando la query di streaming termina.

Note

  • read() è statico e senza stato. Non accedere ai membri della classe modificabili o mantenere lo stato in memoria tra chiamate diverse di read().
  • Tutti i valori di partizione restituiti da partitions() devono essere oggetti selezionabili.
  • Gli offset sono rappresentati come un dict oggetto o ricorsivo dict i cui valori e chiavi sono tipi primitivi: integer, string o booleano.

Examples

Implementare un lettore di streaming che legge da una sequenza di record indicizzati:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")