Condividi tramite


foreach (DataStreamWriter)

Imposta l'output della query di streaming da elaborare usando il writer fornito. La logica di elaborazione può essere specificata come funzione che accetta una riga come input o come oggetto con process(row) e metodi e open(partition_id, epoch_id) facoltativiclose(error).

Sintassi

foreach(f)

Parametri

Parametro Tipo Descrizione
f chiamabile o oggetto Funzione che accetta un oggetto Row come input o un oggetto con un process(row) metodo e metodi e open facoltativiclose.

Restituzioni

DataStreamWriter

Note

L'oggetto fornito deve essere serializzabile. Qualsiasi inizializzazione per la scrittura di dati (ad esempio, l'apertura di una connessione) deve essere eseguita all'interno open()di , non in fase di costruzione.

Examples

import time
df = spark.readStream.format("rate").load()

Elaborare ogni riga usando una funzione:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Elaborare ogni riga usando un oggetto con openi metodi , processe close :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()