foreach (DataStreamWriter)

指定されたライターを使用して処理されるストリーミング クエリの出力を設定します。 処理ロジックは、行を入力として受け取る関数として、または process(row) および省略可能な open(partition_id, epoch_id) メソッドと close(error) メソッドを持つオブジェクトとして指定できます。

構文

foreach(f)

パラメーター

パラメーター タイプ 説明
f 呼び出し可能またはオブジェクト Row を入力として受け取る関数、または process(row) メソッドと省略可能な open メソッドと close メソッドを持つオブジェクト。

返品

DataStreamWriter

メモ

指定されたオブジェクトはシリアル化可能である必要があります。 データを書き込むための初期化 (接続を開くなど) は、構築時ではなく、 open()内で行う必要があります。

例示

import time
df = spark.readStream.format("rate").load()

関数を使用して各行を処理します。

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

openprocess、およびcloseメソッドを持つオブジェクトを使用して各行を処理します。

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()