ストリーミング DataFrame を外部ストレージ システム (ファイル システムやキー値ストアなど) に書き込むためのインターフェイス。
df.writeStreamを使用してこれにアクセスします。
構文
# Access through DataFrame
df.writeStream
メソッド
| メソッド | 説明 |
|---|---|
outputMode(outputMode) |
ストリーミング DataFrame のデータをシンクに書き込む方法を指定します。 オプションは append、complete、update です。 |
format(source) |
出力データ ソースの形式を指定します。 |
option(key, value) |
基になるデータ ソースの出力オプションを追加します。 |
options(**options) |
基になるデータ ソースの複数の出力オプションを追加します。 |
partitionBy(*cols) |
ファイル システム上の指定された列で出力をパーティション分割します。 |
clusterBy(*cols) |
指定された列で出力をクラスター化します。 |
queryName(queryName) |
ストリーミング クエリの名前を指定します。 |
trigger(**kwargs) |
ストリーミング クエリ実行のトリガーを設定します。 |
foreach(f) |
指定された関数またはオブジェクトによって処理されるストリーミング クエリの出力を設定します。 |
foreachBatch(func) |
指定された関数によって処理される各マイクロバッチの出力を設定します。 |
start(path) |
ストリーミング クエリの実行を開始し、 StreamingQuery オブジェクトを返します。 |
table(tableName) |
toTable() の別名。 指定したテーブルにデータを書き込み、 StreamingQuery オブジェクトを返します。 |
toTable(tableName) |
ストリーミング クエリの実行を開始し、指定されたテーブルに継続的に結果を出力します。 |
例示
レート ストリームを読み込み、変換を適用し、コンソールに書き込み、3 秒後に停止します。
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()