このページでは、サポートされている環境、言語、ソース、シンク、演算子など、Structured Streaming のリアルタイム モードのリファレンス情報を提供します。 既知の制限事項については、「 リアルタイム モードの制限事項」を参照してください。
サポートされている言語
リアルタイム モードでは、Scala、Java、およびPythonがサポートされます。
コンピューティングの種類
リアルタイム モードでは、次のコンピューティングの種類がサポートされます。
| コンピューティングの種類 | サポートされている |
|---|---|
| 専用 (以前: シングル ユーザー) | ✓ |
| Standard (以前: 共有) | ✓ (Pythonのみ) |
| Lakeflow Spark 宣言型パイプライン クラシック | サポートしていません |
| Lakeflow Spark 宣言パイプライン サーバーレス | サポートしていません |
| Serverless | サポートしていません |
UDF を使用した待機時間の影響を受けやすいワークロードの場合、Databricks では専用アクセス モードを使用することをお勧めします。 テーブル関数を参照してください。
実行モード
リアルタイム モードでは、更新モードのみがサポートされます。
| 実行モード | サポートされている |
|---|---|
| 更新モード | ✓ |
| 追加モード | サポートしていません |
| 完全モード | サポートしていません |
ソースとシンク
リアルタイム モードでは、次のソースとシンクがサポートされます。
| ソースまたはシンク | ソースとして | シンクとして |
|---|---|---|
| Apache Kafka | ✓ | ✓ |
| Event Hubs (Kafka コネクタを使用) | ✓ | ✓ |
| Kinesis | ✓ (EFO モードのみ) | サポートしていません |
| AWS MSK | ✓ | サポートしていません |
| Delta | サポートしていません | サポートしていません |
| Google Pub/Sub (グーグルパブサブ) | サポートしていません | サポートしていません |
| Apache Pulsar | サポートしていません | サポートしていません |
任意のシンク ( forEachWriterを使用) |
適用なし | ✓ |
オペレーター
リアルタイム モードでは、ほとんどの構造化ストリーミング演算子がサポートされています。
ステートレス操作
| Operator | サポートされている |
|---|---|
| [選択] | ✓ |
| プロジェクション | ✓ |
UDFs
| Operator | サポートされている |
|---|---|
| Scalaユーザー定義関数 (UDF) | ✓ (いくつかの制限あり) |
| PYTHON UDF | ✓ (いくつかの制限あり) |
集約
| Operator | サポートされている |
|---|---|
| sum | ✓ |
| 数える | ✓ |
| max | ✓ |
| min | ✓ |
| avg | ✓ |
| 集計関数 | ✓ |
ウィンドウ処理
| Operator | サポートされている |
|---|---|
| タンブリング | ✓ |
| スライディング | ✓ |
| セッション | サポートしていません |
重複除去 (Deduplication)
| Operator | サポートされている |
|---|---|
| dropDuplicates | ✓ (状態は無制限です) |
| ウォーターマーク内の重複を削除する | サポートしていません |
テーブルへのストリーム結合
| Operator | サポートされている |
|---|---|
| ブロードキャスト テーブル結合 (テーブルは小さくする必要があります) | ✓ |
| Stream to stream join | サポートしていません |
| (フラット)MapGroupsWithState | サポートしていません |
| transformWithState | ✓ (いくつかの違いあり) |
| UNION | ✓ (いくつかの制限あり) |
| forEach | ✓ |
| forEachBatch | サポートしていません |
| mapPartitions | サポートされていません (制限を参照) |
特別な考慮事項
一部の演算子と機能は、リアルタイム モードで使用する場合、特定の考慮事項や違いがあります。
transformWithState リアルタイム モード
カスタムステートフル アプリケーションを構築するために、Databricks は Apache Spark Structured Streaming の API である transformWithState をサポートします。 API とコード スニペットの詳細については、「 カスタム ステートフル アプリケーションの構築 」を参照してください。
ただし、リアルタイム モードでの API の動作と、マイクロバッチ アーキテクチャを利用する従来のストリーミング クエリには、いくつかの違いがあります。
- リアルタイム モードでは、各行に対して
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)メソッドが呼び出されます。-
inputRows反復子は 1 つの値を返します。 マイクロバッチ モードでは、キーごとに 1 回呼び出され、inputRows反復子はマイクロ バッチ内のキーのすべての値を返します。 - コードの記述時にこの違いを考慮する
-
- イベント時間タイマーは、リアルタイム モードではサポートされていません。
-
transformWithStateInPandasはリアルタイム モードではサポートされていません。 代わりに、pandas DataFrames ではなくtransformWithStateオブジェクトを使用する行ベースのRowAPI を使用します。 - リアルタイム モードでは、データ到着に応じてタイマーの起動が遅れます。
- タイマーが 10:00:00 にスケジュールされていても、データが到着しない場合、タイマーはすぐに起動しません。
- データが 10:00:10 に到着すると、タイマーは 10 秒の遅延で起動します。
- データが到着せず、実行時間の長いバッチが終了している場合は、バッチが終了する前にタイマーが起動します。
Note
Databricks Runtime 18.1 以下では、スループットが低く、1 秒あたり 5 レコード未満のPythonに対して transformWithState とリアルタイム モードを使用すると、待機時間が最大で数百ミリ秒に増加する可能性があります。 Databricks では、解決するために Databricks Runtime 18.2 以降にアップグレードすることをお勧めします。
Python UDF をリアルタイム モードで使用する
Databricks では、Pythonユーザー定義関数 (UDF) の大部分がリアルタイム モードでサポートされています。
ステートレス
| UDF の種類 | サポートされている |
|---|---|
| スカラー UDF のPython (ユーザー定義スカラー関数 - Python) | ✓ |
| 矢印スカラー UDF | ✓ |
| Pandas スカラー UDF (pandas ユーザー定義関数) | ✓ |
Arrow 関数 (mapInArrow) |
✓ |
| Pandas 関数 (マップ) | ✓ |
ステートフル グループ化 (UDAF)
| UDF の種類 | サポートされている |
|---|---|
transformWithState (インターフェイス Row のみ) |
✓ |
transformWithStateInPandas |
サポートされていません。 代わりに、pandas DataFrames ではなくtransformWithState オブジェクトを使用する行ベースのRow API を使用します。 詳細については、サポートされていないtransformWithStateInPandasを参照してください。 |
applyInPandasWithState |
サポートしていません |
ステートフルでないグループ化 (UDAF)
| UDF の種類 | サポートされている |
|---|---|
apply |
サポートしていません |
applyInArrow |
サポートしていません |
applyInPandas |
サポートしていません |
テーブル関数
| UDF の種類 | サポートされている |
|---|---|
| UDTF (Python ユーザー定義テーブル関数 (UDF)) | サポートしていません |
| UC UDF | サポートしていません |
Python UDF をリアルタイム モードで使用する場合は、いくつかの点を考慮する必要があります。
- 待機時間を最小限に抑えるには、方向バッチ サイズ (
spark.sql.execution.arrow.maxRecordsPerBatch) を 1 に構成します。- トレードオフ: この構成は、スループットを犠牲にして待機時間を最適化します。 ほとんどのワークロードでは、この設定をお勧めします。
- バッチ サイズを増やすのは、入力ボリュームに対応するためにより高いスループットが必要な場合に限り、待機時間の増加の可能性を受け入れます。
- Pandas UDF と関数は、矢印バッチ サイズが 1 の場合、うまく機能しません。
- pandas UDF または関数を使用する場合は、矢印バッチ サイズを高い値 (100 以上など) に設定します。
- これは、待機時間が長いことを意味します。 Databricks では、可能であれば、方向 UDF または関数を使用することをお勧めします。
-
transformWithStateInPandasはリアルタイム モードではサポートされていません。 代わりに、pandas DataFrames ではなくtransformWithStateオブジェクトを使用する行ベースのRowAPI を使用します。transformWithStateInPandasはサポートされていません および Real-time モードの例行ベースの API を使用したPython例を参照してください。 - UDF を使用した待機時間の影響を受けやすいワークロードの場合、Databricks では専用アクセス モードを使用することをお勧めします。 標準アクセス モードでは、セキュリティ分離のオーバーヘッドによって UDF のパフォーマンスが低下する可能性があります。