Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Imposta l'output della query di streaming da elaborare usando il writer fornito. La logica di elaborazione può essere specificata come funzione che accetta una riga come input o come oggetto con process(row) e metodi e open(partition_id, epoch_id) facoltativiclose(error).
Sintassi
foreach(f)
Parametri
| Parametro | Tipo | Descrizione |
|---|---|---|
f |
chiamabile o oggetto | Funzione che accetta un oggetto Row come input o un oggetto con un process(row) metodo e metodi e open facoltativiclose. |
Restituzioni
DataStreamWriter
Note
L'oggetto fornito deve essere serializzabile. Qualsiasi inizializzazione per la scrittura di dati (ad esempio, l'apertura di una connessione) deve essere eseguita all'interno open()di , non in fase di costruzione.
Examples
import time
df = spark.readStream.format("rate").load()
Elaborare ogni riga usando una funzione:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Elaborare ogni riga usando un oggetto con openi metodi , processe close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()