外部ストレージ システム (ファイル システムやキー値ストアなど) からストリーミング DataFrame を読み込むためのインターフェイス。
spark.readStreamを使用してこれにアクセスします。
構文
# Access through SparkSession
spark.readStream
メソッド
| メソッド | 説明 |
|---|---|
format(source) |
入力データ ソースの形式を指定します。 |
schema(schema) |
ストリーミング データフレームのスキーマを指定します。 |
option(key, value) |
基になるデータ ソースの入力オプションを追加します。 |
options(**options) |
基になるデータ ソースの複数の入力オプションを追加します。 |
load(path) |
指定されたパスからストリーミング DataFrame を読み込んで返します。 |
json(path) |
JSON ファイル ストリームを読み込み、DataFrame を返します。 |
orc(path) |
ORC ファイル ストリームを読み込み、DataFrame を返します。 |
parquet(path) |
Parquet ファイル ストリームを読み込み、DataFrame を返します。 |
text(path) |
テキスト ファイル ストリームを読み込み、DataFrame を返します。 |
csv(path) |
CSV ファイル ストリームを読み込み、DataFrame を返します。 |
xml(path) |
XML ファイル ストリームを読み込み、DataFrame を返します。 |
table(tableName) |
ストリーミング Delta テーブルを読み込み、DataFrame を返します。 |
name(source_name) |
チェックポイントの進化のためにストリーミング ソースに名前を割り当てます。 |
changes(tableName) |
指定したテーブルの行レベルの変更 (変更データ キャプチャ) をストリーミング DataFrame として返します。 |
例示
spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>
レート ストリームを読み込み、変換を適用し、コンソールに書き込み、3 秒後に停止します。
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()