Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Classe di base per i lettori semplificati dell'origine dati di streaming.
Rispetto a DataSourceStreamReader, SimpleDataSourceStreamReader non richiede la pianificazione delle partizioni di dati. Il read() metodo consente la lettura dei dati e la pianificazione dell'offset più recente contemporaneamente.
Poiché SimpleDataSourceStreamReader legge i record nel driver Spark per determinare l'offset finale di ogni batch senza partizionamento, è adatto solo per i casi d'uso leggeri in cui la velocità di input e le dimensioni del batch sono ridotte. Usare DataSourceStreamReader quando la velocità effettiva di lettura è elevata e non può essere gestita da un singolo processo.
Aggiunta in Databricks Runtime 15.3
Sintassi
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| metodo | Descrizione |
|---|---|
initialOffset() |
Restituisce l'offset iniziale dell'origine dati di streaming. Una nuova query di streaming inizia a leggere da questo offset. |
read(start) |
Legge tutti i dati disponibili dall'offset iniziale e restituisce una tupla di un iteratore di record e l'offset finale per il tentativo di lettura successivo. |
readBetweenOffsets(start, end) |
Legge tutti i dati disponibili tra offset di inizio e fine specifici. Richiamato durante il ripristino degli errori per rileggere un batch in modo deterministico. |
commit(end) |
Informa l'origine che Spark ha completato l'elaborazione di tutti i dati per gli offset minori o uguali a end. |
Examples
Definire un lettore di origine dati di streaming semplificato personalizzato:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()