foreachBatch (DataStreamWriter)

指定された関数を使用して処理されるストリーミング クエリの出力を設定します。 マイクロバッチ実行モードでのみサポートされます (つまり、トリガーが連続していない場合)。 すべてのマイクロバッチでは、指定された関数が、出力行を DataFrame およびバッチ識別子として使用して呼び出されます。 バッチ ID を使用して、出力を重複除去し、トランザクションによって外部システムに書き込むことができます。

構文

foreachBatch(func)

パラメーター

パラメーター タイプ 説明
func 呼び出し DataFrame とバッチ ID (int) を入力として受け取る関数。

返品

DataStreamWriter

メモ

Spark Connect モードでは、指定された関数は、その外部で定義された変数にアクセスできません。

例示

import time
df = spark.readStream.format("rate").load()

def func(batch_df, batch_id):
    batch_df.collect()

q = df.writeStream.foreachBatch(func).start()
time.sleep(3)
q.stop()