簡略化されたストリーミング データ ソース リーダーの基本クラス。
DataSourceStreamReaderと比較して、SimpleDataSourceStreamReaderはデータ パーティションの計画を必要としません。
read()メソッドを使用すると、データを読み取り、最新のオフセットを同時に計画できます。
SimpleDataSourceStreamReaderは Spark ドライバーのレコードを読み取り、パーティション分割を行わずに各バッチの終了オフセットを決定するため、入力速度とバッチ サイズが小さい軽量ユース ケースにのみ適しています。 読み取りスループットが高く、1 つのプロセスで処理できない場合は、 DataSourceStreamReader を使用します。
Databricks Runtime 15.3 で追加されました
構文
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
メソッド
| メソッド | 説明 |
|---|---|
initialOffset() |
ストリーミング データ ソースの初期オフセットを返します。 新しいストリーミング クエリは、このオフセットからの読み取りを開始します。 |
read(start) |
開始オフセットから使用可能なすべてのデータを読み取り、次の読み取り試行のレコードの反復子と終了オフセットのタプルを返します。 |
readBetweenOffsets(start, end) |
特定の開始オフセットと終了オフセットの間で使用可能なすべてのデータを読み取ります。 バッチを確定的に再読み取りするために、障害復旧中に呼び出されます。 |
commit(end) |
end以下のオフセットのすべてのデータの処理が Spark によって完了したことをソースに通知します。 |
例示
カスタムの簡略化されたストリーミング データ ソース リーダーを定義します。
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()