Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Eine Basisklasse für vereinfachte Streamingdatenquellenleser.
Im Vergleich zu DataSourceStreamReader, SimpleDataSourceStreamReader erfordert keine Planungsdatenpartitionen. Die read() Methode ermöglicht das Lesen von Daten und die Planung des neuesten Offsets gleichzeitig.
Da SimpleDataSourceStreamReader Datensätze im Spark-Treiber gelesen werden, um den Endoffset jedes Batches ohne Partitionierung zu ermitteln, eignet es sich nur für einfache Anwendungsfälle, in denen eingaberate und Batchgröße klein sind. Wird verwendet DataSourceStreamReader , wenn der Lesedurchsatz hoch ist und von einem einzelnen Prozess nicht verarbeitet werden kann.
Hinzugefügt in Databricks Runtime 15.3
Syntax
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methodik
| Methode | Beschreibung |
|---|---|
initialOffset() |
Gibt den anfänglichen Offset der Streamingdatenquelle zurück. Eine neue Streamingabfrage beginnt mit dem Lesen aus diesem Offset. |
read(start) |
Liest alle verfügbaren Daten aus dem Anfangsoffset und gibt ein Tupel eines Iterators von Datensätzen und den Endoffset für den nächsten Leseversuch zurück. |
readBetweenOffsets(start, end) |
Liest alle verfügbaren Daten zwischen bestimmten Anfangs- und Endoffsets. Wird während der Fehlerwiederherstellung aufgerufen, um einen Batch deterministisch erneut zu lesen. |
commit(end) |
Informiert die Quelle, dass Spark die Verarbeitung aller Daten für Offsets abgeschlossen hat, die kleiner oder gleich sind end. |
Beispiele
Definieren Sie einen benutzerdefinierten vereinfachten Streamingdatenquellenleser:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()