SimpleDataSourceStreamReader

Eine Basisklasse für vereinfachte Streamingdatenquellenleser.

Im Vergleich zu DataSourceStreamReader, SimpleDataSourceStreamReader erfordert keine Planungsdatenpartitionen. Die read() Methode ermöglicht das Lesen von Daten und die Planung des neuesten Offsets gleichzeitig.

Da SimpleDataSourceStreamReader Datensätze im Spark-Treiber gelesen werden, um den Endoffset jedes Batches ohne Partitionierung zu ermitteln, eignet es sich nur für einfache Anwendungsfälle, in denen eingaberate und Batchgröße klein sind. Wird verwendet DataSourceStreamReader , wenn der Lesedurchsatz hoch ist und von einem einzelnen Prozess nicht verarbeitet werden kann.

Hinzugefügt in Databricks Runtime 15.3

Syntax

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methodik

Methode Beschreibung
initialOffset() Gibt den anfänglichen Offset der Streamingdatenquelle zurück. Eine neue Streamingabfrage beginnt mit dem Lesen aus diesem Offset.
read(start) Liest alle verfügbaren Daten aus dem Anfangsoffset und gibt ein Tupel eines Iterators von Datensätzen und den Endoffset für den nächsten Leseversuch zurück.
readBetweenOffsets(start, end) Liest alle verfügbaren Daten zwischen bestimmten Anfangs- und Endoffsets. Wird während der Fehlerwiederherstellung aufgerufen, um einen Batch deterministisch erneut zu lesen.
commit(end) Informiert die Quelle, dass Spark die Verarbeitung aller Daten für Offsets abgeschlossen hat, die kleiner oder gleich sind end.

Beispiele

Definieren Sie einen benutzerdefinierten vereinfachten Streamingdatenquellenleser:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()