Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Define a saída da query em streaming para ser processada usando o escritor fornecido. A lógica de processamento pode ser especificada como uma função que recebe uma linha como entrada, ou como um objeto com process(row) e opcional open(partition_id, epoch_id) e close(error) métodos.
Sintaxe
foreach(f)
Parâmetros
| Parâmetro | Tipo | Descrição |
|---|---|---|
f |
chamável ou objeto | Uma função que recebe uma Linha como entrada, ou um objeto com um process(row) método e métodos opcionais openclose. |
Devoluções
DataStreamWriter
Notes
O objeto fornecido deve ser serializável. Qualquer inicialização para escrita de dados (por exemplo, abrir uma ligação) deve ser feita dentro open()do tempo, não no momento da construção.
Exemplos
import time
df = spark.readStream.format("rate").load()
Processe cada linha usando uma função:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Processe cada linha usando um objeto com open, process, e close métodos:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()