指定されたライターを使用して処理されるストリーミング クエリの出力を設定します。 処理ロジックは、行を入力として受け取る関数として、または process(row) および省略可能な open(partition_id, epoch_id) メソッドと close(error) メソッドを持つオブジェクトとして指定できます。
構文
foreach(f)
パラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
f |
呼び出し可能またはオブジェクト | Row を入力として受け取る関数、または process(row) メソッドと省略可能な open メソッドと close メソッドを持つオブジェクト。 |
返品
DataStreamWriter
メモ
指定されたオブジェクトはシリアル化可能である必要があります。 データを書き込むための初期化 (接続を開くなど) は、構築時ではなく、 open()内で行う必要があります。
例示
import time
df = spark.readStream.format("rate").load()
関数を使用して各行を処理します。
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
open、process、およびcloseメソッドを持つオブジェクトを使用して各行を処理します。
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()