foreach (DataStreamWriter)

Legt die Ausgabe der Streamingabfrage fest, die mit dem bereitgestellten Writer verarbeitet werden soll. Die Verarbeitungslogik kann als Funktion angegeben werden, die eine Zeile als Eingabe oder als Objekt mit process(row) und optional open(partition_id, epoch_id) und close(error) methoden verwendet.

Syntax

foreach(f)

Parameter

Parameter Typ Beschreibung
f Aufrufbar oder Objekt Eine Funktion, die eine Zeile als Eingabe oder ein Objekt mit einer process(row) Methode und optionalen open Methoden close verwendet.

Rückkehr

DataStreamWriter

Hinweise

Das bereitgestellte Objekt muss serialisierbar sein. Jede Initialisierung zum Schreiben von Daten (z. B. öffnen einer Verbindung) sollte innerhalb open()und nicht zur Bauzeit erfolgen.

Beispiele

import time
df = spark.readStream.format("rate").load()

Verarbeiten sie jede Zeile mithilfe einer Funktion:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Verarbeiten Sie jede Zeile mithilfe eines Objekts mit open, processund close Methoden:

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()