自動ローダーでは、追加の設定を行わなくても、クラウド ストレージに到着した新しいデータ ファイルが段階的かつ効率的に処理されます。
自動ローダーのしくみはどのようなものですか?
自動ローダーは、新しいデータ ファイルがクラウド ストレージに到着すると、それらを段階的かつ効率的に処理します。
cloudFiles と呼ばれる構造化ストリーミング ソースを提供します。 クラウド ファイル ストレージ上に入力ディレクトリ パスを指定すると、cloudFiles ソースでは、新しいファイルが到着したときにそれらが自動的に処理されます。また、そのディレクトリ内の既存のファイルも処理できます。 自動ローダーでは、Lakeflow Spark 宣言パイプラインのPythonと SQL の両方がサポートされています。
自動ローダーを使用して、移行する数十億個のファイルを処理したり、テーブルをバックフィルしたりできます。 自動ローダーは、1 時間あたり数百万個のファイルの、ほぼリアルタイムのインジェストをサポートするようにスケーリングされます。
サポートされている自動ローダー ソース
自動ローダーは、次のソースからデータ ファイルを読み込むことができます。
Amazon S3 (
s3://)Azure Data Lake Storage (ADLS、
abfss://)Google Cloud Storage (GCS、
gs://)Azure Blob Storage (
wasbs://)Note
レガシ Windows Azure Storage BLOB ドライバー (WASB) は非推奨になりました。 ABFS には WASB よりも多くの利点があります。 AzureのABFSに関するドキュメントを参照してください。 レガシ WASB ドライバーの使用に関するドキュメントについては、「CONNECT to Azure Blob Storage with WASB (legacy)を参照してください。
Databricks ファイル システム (DBFS、
dbfs:/)
自動ローダーは、JSON、CSV、XML、PARQUET、AVRO、ORC、TEXT、および BINARYFILE のファイル形式を取り込めます。
自動ローダーでは、インジェストの進行状況はどのように追跡されますか?
ファイルが検出されると、そのメタデータは、自動ローダー パイプラインのチェックポイントの場所にあるスケーラブルなキー/値ストア (RocksDB) に保持されます。 このキー/値ストアにより、データは厳密に 1 回だけ処理されます。
エラーが発生した場合、自動ローダーではチェックポイントの場所に格納されている情報に基づいて中断された場所から再開でき、Delta Lake にデータを書き込むときに厳密に 1 回だけという保証を提供し続けられます。 フォールト トレランスや厳密に 1 回だけのセマンティクスを実現するために、状態を自分で維持したり管理したりする必要はありません。
Lakeflow Spark デクレラティブ パイプラインと自動ローダーを使用した増分インジェスト
Databricks では、増分データ インジェストのために、Lakeflow Spark 宣言パイプラインの自動ローダーを推奨します。 Lakeflow Spark 宣言型パイプラインによってパイプラインのこれらの設定が自動的に管理されるため、スキーマまたはチェックポイントの場所を指定する必要はありません。 推奨 される構成については、実稼働ワークロード用の自動ローダーの 構成を参照してください。
また、Databricks では、Apache Spark Structured Streaming を使用してクラウド オブジェクト ストレージからデータを取り込むときはいつでも自動ローダーが推奨されます。 API は、Pythonおよび Scala で使用できます。
Databricks 自動ローダーを使ってみる
Lakeflow Spark 宣言パイプラインで自動ローダーを使用した増分データ インジェストの構成を開始するには、次の記事を参照してください。
- チュートリアル: Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築する
- Azure Data Lake Storage からのインクレメンタル インジェストを設定します
例: 一般的な自動ローダーのパターン
一般的な自動ローダー パターンの例については、「一般的なデータ読み込みパターン」を参照してください。
自動ローダーのオプションを構成する
データの量、種類、速度に基づいて自動ローダーを調整できます。
- 自動ローダーでスキーマの推論と進化を構成する: 新しい列や型の変更の処理を含め、自動ローダーが時間の経過と同時にデータのスキーマを推論および進化する方法を構成します。
- 自動ローダーを使用した自動タイプ拡大
- 運用ワークロード用に自動ローダーを構成する: チェックポイント処理、エラー処理、ファイル保持管理など、運用環境での信頼性とパフォーマンスのために自動ローダーを最適化します。
- ソース データの保持: インジェスト後にファイルを自動的にアーカイブまたは削除して、ストレージ コストを削減し、ファイルの検出を高速化します。
自動ローダー オプションの完全な一覧については、「 自動ローダー オプション」を参照してください。 予期しないパフォーマンスが発生した場合は、FAQ を参照してください。
自動ローダー ファイル検出モードを構成する
自動ローダーでは、2 つの ファイル検出モードがサポートされています。 See:
順不同のデータを処理する
自動ローダーでは、ディレクトリ 一覧モードとファイル通知モードのどちらを使用しているかに関係なく、ファイルの検出または処理の順序は保証されません。 次の戦略を使用して、順不同のファイル到着を処理するパイプラインを設計します。
Lakeflow Spark の宣言型パイプライン AUTO CDC
自動ローダーとAUTO CDCを使用してLakeflow Sparkの宣言型パイプラインを使用する場合は、削除されたレコードが順序外のファイル到着を処理できるよう、削除データの保持期間(tombstone retention)を適切に構成してください。 ターゲット ストリーミング テーブルの pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを、イベントの到着とパイプラインの実行の間に予想される最大遅延を超える値に設定します。 既定のリテンション期間は 2 日間です。 詳細については、 create_auto_cdc_flowを参照してください。
Lakeflow Spark 宣言パイプラインを使用しない構造化ストリーミング
(Lakeflow Spark 宣言型パイプラインを使用せずに) 自動ローダーで Apache Spark 構造化ストリーミングを直接使用する場合は、次のパターンを考慮して順序が整わないデータを処理します。
- ハード削除よりも論理的な削除を優先する: 行を削除するのではなく、
deletedフラグとタイムスタンプを追跡して、到着が遅れた削除が以前のレコードと競合しないようにします。 - 更新プログラムを適用する前にタイムスタンプを比較する: アップサートするときは、受信レコードの更新タイムスタンプをターゲット行の現在のタイムスタンプと比較して、古いデータで上書きされないようにします。
ファイルでの構造化ストリーミングの直接使用より、自動ローダーが優位な点
Apache Spark では、spark.readStream.format(fileFormat).load(directory) を使用してファイルを増分方式で読み取りできます。 自動ローダーには、ファイル ソースに対して次のベネフィットがあります。
- スケーラビリティ: 自動ローダーでは、数十億のファイルを効率的に検出できます。 コンピューティング リソースの無駄を回避するために、バックフィルを非同期的に実行できます。
- パフォーマンス: 自動ローダーを使用してファイルを検出するコストは、ファイルが格納されている可能性のあるディレクトリの数ではなく、取り込まれているファイルの数に応じて増減します。 ディレクトリー・リスト・モードでの自動ローダー・ストリームの構成を参照してください。
- スキーマの推論と展開のサポート: 自動ローダーでは、スキーマ ドリフトを検出し、スキーマの変更が発生した場合に通知して、通知されなかった場合は無視または失われていたはずのデータを保護できます。 自動ローダー スキーマ推論のしくみを参照してください。
- コスト: 自動ローダーでは、ネイティブ クラウド API を使用して、ストレージに存在するファイルの一覧を取得します。 さらに、自動ローダーのファイル通知モードは、ディレクトリの一覧を完全に回避することで、クラウド コストをさらに削減するのに役立ちます。 自動ローダーを使用すると、ストレージにファイル通知サービスを自動的に設定して、ファイル検出のコストを大幅に削減できます。