foreach (DataStreamWriter)

Define a saída da query em streaming para ser processada usando o escritor fornecido. A lógica de processamento pode ser especificada como uma função que recebe uma linha como entrada, ou como um objeto com process(row) e opcional open(partition_id, epoch_id) e close(error) métodos.

Sintaxe

foreach(f)

Parâmetros

Parâmetro Tipo Descrição
f chamável ou objeto Uma função que recebe uma Linha como entrada, ou um objeto com um process(row) método e métodos opcionais openclose.

Devoluções

DataStreamWriter

Notes

O objeto fornecido deve ser serializável. Qualquer inicialização para escrita de dados (por exemplo, abrir uma ligação) deve ser feita dentro open()do tempo, não no momento da construção.

Exemplos

import time
df = spark.readStream.format("rate").load()

Processe cada linha usando uma função:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Processe cada linha usando um objeto com open, process, e close métodos:

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()