Freigeben über


foreachBatch (DataStreamWriter)

Legt die Ausgabe der Streamingabfrage fest, die mithilfe der bereitgestellten Funktion verarbeitet werden soll. Wird nur im Mikrobatchausführungsmodus unterstützt (d. a. wenn der Trigger nicht fortlaufend ist). In jedem Mikrobatch wird die bereitgestellte Funktion mit den Ausgabezeilen als DataFrame und dem Batchbezeichner aufgerufen. Die Batch-ID kann verwendet werden, um die Ausgabe in externe Systeme zu deduplizieren und transaktional zu schreiben.

Syntax

foreachBatch(func)

Parameter

Parameter Typ Beschreibung
func Aufrufbaren Eine Funktion, die einen DataFrame und eine Batch-ID (int) als Eingabe verwendet.

Rückkehr

DataStreamWriter

Hinweise

Im Spark Connect-Modus hat die bereitgestellte Funktion keinen Zugriff auf variablen, die außerhalb davon definiert sind.

Beispiele

import time
df = spark.readStream.format("rate").load()

def func(batch_df, batch_id):
    batch_df.collect()

q = df.writeStream.foreachBatch(func).start()
time.sleep(3)
q.stop()