Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Legt die Ausgabe der Streamingabfrage fest, die mit dem bereitgestellten Writer verarbeitet werden soll. Die Verarbeitungslogik kann als Funktion angegeben werden, die eine Zeile als Eingabe oder als Objekt mit process(row) und optional open(partition_id, epoch_id) und close(error) methoden verwendet.
Syntax
foreach(f)
Parameter
| Parameter | Typ | Beschreibung |
|---|---|---|
f |
Aufrufbar oder Objekt | Eine Funktion, die eine Zeile als Eingabe oder ein Objekt mit einer process(row) Methode und optionalen open Methoden close verwendet. |
Rückkehr
DataStreamWriter
Hinweise
Das bereitgestellte Objekt muss serialisierbar sein. Jede Initialisierung zum Schreiben von Daten (z. B. öffnen einer Verbindung) sollte innerhalb open()und nicht zur Bauzeit erfolgen.
Beispiele
import time
df = spark.readStream.format("rate").load()
Verarbeiten sie jede Zeile mithilfe einer Funktion:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Verarbeiten Sie jede Zeile mithilfe eines Objekts mit open, processund close Methoden:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()