DataStreamReader

外部ストレージ システム (ファイル システムやキー値ストアなど) からストリーミング DataFrame を読み込むためのインターフェイス。 spark.readStreamを使用してこれにアクセスします。

構文

# Access through SparkSession
spark.readStream

メソッド

メソッド 説明
format(source) 入力データ ソースの形式を指定します。
schema(schema) ストリーミング データフレームのスキーマを指定します。
option(key, value) 基になるデータ ソースの入力オプションを追加します。
options(**options) 基になるデータ ソースの複数の入力オプションを追加します。
load(path) 指定されたパスからストリーミング DataFrame を読み込んで返します。
json(path) JSON ファイル ストリームを読み込み、DataFrame を返します。
orc(path) ORC ファイル ストリームを読み込み、DataFrame を返します。
parquet(path) Parquet ファイル ストリームを読み込み、DataFrame を返します。
text(path) テキスト ファイル ストリームを読み込み、DataFrame を返します。
csv(path) CSV ファイル ストリームを読み込み、DataFrame を返します。
xml(path) XML ファイル ストリームを読み込み、DataFrame を返します。
table(tableName) ストリーミング Delta テーブルを読み込み、DataFrame を返します。
name(source_name) チェックポイントの進化のためにストリーミング ソースに名前を割り当てます。
changes(tableName) 指定したテーブルの行レベルの変更 (変更データ キャプチャ) をストリーミング DataFrame として返します。

例示

spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>

レート ストリームを読み込み、変換を適用し、コンソールに書き込み、3 秒後に停止します。

import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()