Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Uma classe base para leitores simplificados de fontes de dados em streaming.
Comparado com DataSourceStreamReader, SimpleDataSourceStreamReader não requer planear partições de dados. O read() método permite ler dados e planear o deslocamento mais recente ao mesmo tempo.
Como SimpleDataSourceStreamReader lê registos no driver Spark para determinar o deslocamento final de cada lote sem particionamento, é adequado apenas para casos de uso leve onde a taxa de entrada e o tamanho do lote são pequenos. Use DataSourceStreamReader quando o débito de leitura for elevado e não pode ser gerido por um único processo.
Adicionado no Databricks Runtime 15.3
Sintaxe
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Método | Descrição |
|---|---|
initialOffset() |
Devolve o deslocamento inicial da fonte de dados em streaming. Uma nova consulta de streaming começa a ler a partir deste deslocamento. |
read(start) |
Lê todos os dados disponíveis do offset inicial e devolve uma tupla de um iterador de registos e o deslocamento final para a próxima tentativa de leitura. |
readBetweenOffsets(start, end) |
Lê todos os dados disponíveis entre os deslocamentos específicos de início e fim. Invocado durante a recuperação de falhas para reler um lote deterministicamente. |
commit(end) |
Informa a fonte que o Spark completou o processamento de todos os dados para deslocamentos menores ou iguais a end. |
Exemplos
Defina um leitor de fonte de dados simplificado personalizado:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()