構造化ストリーミングのトリガー間隔を構成する

このページでは、Azure Databricksで構造化ストリーミングのトリガー間隔を構成する方法について説明します。

Apache Spark Structured Streaming は、データを段階的に処理します。 トリガー間隔は、構造化ストリーミングが新しいデータをチェックする頻度を制御します。 ほぼリアルタイムの処理、スケジュールされたデータベースの更新、1 日または 1 週間のすべての新しいデータのバッチ処理のトリガー間隔を構成できます。

自動ローダーとは、構造化ストリーミングを使用してデータを読み込むため、トリガーのしくみを理解することで、目的の頻度でデータを取り込みながらコストを制御する最大の柔軟性を提供します。

重要

Azure Databricksでは、ユース ケースの待機時間とコストのバランスを取るトリガー モードを設定することをお勧めします。 そうしないと、クラウド プロバイダーから予期しないストレージ コストが発生する可能性があります。 詳細については、 クラウド ストレージ コストの制御 に関するページを参照してください。

トリガー モードの概要

次の表は、構造化ストリーミングで使用できるトリガー モードをまとめたものです。

トリガー モード 構文の例 (Python) 最適な対象者
未指定 (既定値) N/A 待機時間が 3 ~ 5 秒の汎用ストリーミング。 0 ミリ秒間隔の processingTime トリガーと同じです。 ストリーム処理は、新しいデータが到着する限り継続的に実行されます。
処理時間 .trigger(processingTime='10 seconds') コストとパフォーマンスのバランスを取る。 システムでデータのチェックが頻繁に行われるのを防ぐことで、オーバーヘッドを軽減します。
今すぐ利用可能 .trigger(availableNow=True) スケジュールされた増分バッチ処理。 ストリーミング ジョブがトリガーされた時点で使用可能な量のデータを処理します。
リアルタイム モード .trigger(realTime='5 minutes') 不正行為の検出やリアルタイムのパーソナル化など、2 秒未満の処理を必要とする超低待機時間の運用ワークロード。 パブリック プレビュー。 '5 分' はマイクロバッチの長さを示します。 クエリのコンパイルなどのバッチごとのオーバーヘッドを最小限に抑えるには、5 分を使用します。
継続的 .trigger(continuous='1 second') サポートされていません。 これは、Spark OSS に含まれる試験的な機能です。 代わりにリアルタイム モードを使用してください。

:::note サーバーレス コンピューティング

サーバーレス コンピューティングでは、 Trigger.AvailableNow()Trigger.Once() のみがサポートされます。 Databricks では Trigger.AvailableNow() を推奨しています。

サーバーレス コンピューティングでの継続的ストリーミングには、継続的パイプライン モードを使用します。トリガーされたパイプライン モードではなく、連続モードで行ってください。

ストリーミングの制限事項を参照してください。

:::

processingTime: 時間ベースのトリガー間隔

構造化ストリーミングは、時間ベースのトリガー間隔を "固定間隔マイクロバッチ" と言います。 processingTime キーワードを使用して、.trigger(processingTime='10 seconds') のように、期間を文字列として指定します。

この間隔の構成によって、新しいデータが到着したかどうかを確認するためにシステムがチェックを実行する頻度が決まります。 待機時間の要件とデータがソースに到達する速度のバランスを取るために処理時間を構成します。

AvailableNow: 増分バッチ処理

重要

Databricks Runtime 11.3 LTS 以降では、 Trigger.Once は非推奨です。 すべての増分バッチ処理ワークロードに対して Trigger.AvailableNow を使用します。

AvailableNow トリガー オプションでは、使用可能なすべてのレコードが増分バッチとして使用され、maxBytesPerTriggerなどのオプションを使用してバッチ サイズを構成できます。 サイズ設定オプションは、データ ソースによって異なります。

サポートされるデータ ソース

Azure Databricksでは、多くの構造化ストリーミング ソースからの増分バッチ処理に Trigger.AvailableNow を使用できます。 次の表に、各データ ソースに必要な Databricks Runtime の最小サポート バージョンを示します。

ソース Databricks の最低ランタイム バージョン
ファイル ソース (JSON、Parquet など) 9.1 LTS
Delta Lake 10.4 LTS
自動ローダー 10.4 LTS
Apache Kafka 10.4 LTS
キネシス 13.1

realTime: 超低待機時間の運用ワークロード

構造化ストリーミングのリアルタイム モードでは、末尾が 1 秒未満でエンドツーエンドの待機時間が実現され、一般的な場合は約 300 ミリ秒です。 リアルタイム モードを効果的に構成して使用する方法の詳細については、「 構造化ストリーミングのリアルタイム モード」を参照してください。

Apache Spark には、 継続的処理と呼ばれるトリガー間隔が追加されています。 このモードは、Spark 2.3 以降、試験段階として分類されています。 Azure Databricksでは、このモードはサポートも推奨もされません。 待機時間の短いユース ケースでは、代わりにリアルタイム モードを使用してください。

このページの連続処理モードは、 Lakeflow Spark 宣言パイプラインでの連続処理とは無関係です。

クラウド ストレージ コストを制御する

既定では、トリガー モードを設定しない場合、Structured Streaming によってトリガー モードが processingTime に設定され、間隔が 0に設定され、数ミリ秒ごとに新しいデータがチェックされます。 これにより、1 日あたり大量のクラウド ストレージ API 呼び出しが生成され、クラウド プロバイダーから予期しない料金が発生する可能性があります。

Databricks では、待機時間とコストの要件に適したトリガー モードを構成することをお勧めします。 時間ベースのトリガー間隔の構成については、 processingTime を参照してください。

実行間のトリガー間隔を変更する

同じチェックポイントを使用しながら、実行間のトリガー間隔を変更できます。

間隔を変更するときの動作

マイクロバッチの処理中に構造化ストリーミング ジョブが停止した場合、そのマイクロバッチは、新しいトリガー間隔が適用される前に完了する必要があります。 その結果、トリガー間隔を変更した後、以前に指定した設定でマイクロバッチ処理が行われる場合があります。 次に、移行時の予期される動作について説明します。

  • 時間ベースの間隔から AvailableNowへの切り替え: マイクロバッチは、使用可能なすべてのレコードを増分バッチとして処理する前に処理される可能性があります。

  • AvailableNowから時間ベースの間隔への切り替え: 最後のAvailableNow ジョブがトリガーされたときに使用可能なすべてのレコードに対して処理が続行される場合があります。 これは正しい動作です。

クエリエラーからの復旧

増分バッチに関連付けられているクエリエラーから復旧しようとしている場合、バッチを完了する必要があるため、トリガー間隔を変更してもこの問題は解決しません。 問題の解決を試みるために、バッチの処理に使用されるコンピューティング容量をスケールアップします。 まれに、新しいチェックポイントを使用してストリームを再起動することが必要になる場合があります。