パイプラインにデータを読み込む

パイプラインを使用して、Azure Databricks 上の Apache Spark でサポートされている任意のデータ ソースからデータを読み込むことができます。 Spark DataFrame のストリーミング DataFrames や Pandas for Spark DataFrames など、Spark DataFrame を返すクエリに対して、Lakeflow Spark 宣言パイプラインでデータセット (テーブルとビュー) を定義できます。 データ インジェスト タスクの場合、Databricks では、ほとんどのユース ケースでストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、自動ローダーを使用してクラウド オブジェクト ストレージから、または Kafka などのメッセージ バスからデータを取り込む場合に便利です。

すべてのデータ ソースで、インジェストに対して SQL がサポートされているわけではありません。 ただし、SQL ソースとPython ソースを同じパイプラインに混在させ、必要に応じてPythonを使用できます。 既定で Lakeflow Spark 宣言パイプラインにパッケージ化されていないライブラリの操作の詳細については、 パイプラインの Python 依存関係の管理に関するページを参照してください。 Azure Databricks でのインジェストの一般的な情報については、 Lakeflow Connect の Standard コネクタに関するページを参照してください。

次の例では、一般的なデータ読み込みパターンを示します。

既存のテーブルからの読み込み

Azure Databricks 内の既存のテーブルからデータを読み込みます。 クエリを使用してデータを変換したり、パイプラインでさらに処理するためにテーブルを読み込んだりすることができます。

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

クラウド オブジェクト ストレージからファイルを読み込む

Databricks では、クラウド オブジェクト ストレージまたは Unity カタログ ボリューム内のファイルからのほとんどのデータ インジェスト タスクに対して、パイプラインで自動ローダーを使用することをお勧めします。 自動ローダーとパイプラインは、クラウドストレージに到着した増加し続けるデータを、段階的かつべき等に読み込むように設計されています。 「自動ローダーとは」を参照し、オブジェクト ストレージからデータを読み込みます

次の例では、自動ローダーを使用してクラウド ストレージからデータを読み取ります。

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

次の例では、自動ローダーを使用して、Unity カタログ ボリューム内の CSV ファイルからデータセットを作成します。

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

  • ファイル通知で自動ローダーを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用してクリーンアップを実行できます。
  • Unity Catalog が有効になったパイプラインで自動ローダーを使用してファイルを読み込むには、外部の場所を使用する必要があります。 パイプラインで Unity カタログを使用する方法の詳細については、「パイプライン で Unity カタログを使用する」を参照してください。

クラウド ストレージに対する認証

自動ローダーは、Unity カタログの外部の場所を使用して、クラウド ストレージに対する認証を行います。 読み取るストレージ パスの外部の場所を構成し、実行中のユーザーに READ FILES 特権を付与する必要があります。

Azure Data Lake Storageから取り込むには、ストレージ コンテナーを参照するストレージ資格情報に基づく外部の場所を構成します。 詳細については、「 Unity カタログを使用してクラウド オブジェクト ストレージに接続する」を参照してください。

メッセージ バスからデータを読み込む

メッセージ バスからデータを取り込むパイプラインを構成できます。 Databricks では、メッセージ バスからの待機時間の短い読み込みに最も効率的なインジェストを提供するために、継続的な実行と拡張された自動スケーリングでストリーミング テーブルを使用することをお勧めします。 詳細については、「 自動スケーリングを使用して Lakeflow Spark 宣言パイプラインのクラスター使用率を最適化する」を参照してください。

たとえば、次のコードは、 read_kafka 関数を使用して Kafka からデータを取り込むためのストリーミング テーブルを構成します。

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

他のメッセージ バス ソースから取り込むには、次を参照してください。

Azure Event Hubs からデータを読み込む

Azure Event Hubs は、Apache Kafka 互換インターフェイスを提供するデータ ストリーミング サービスです。 Lakeflow Spark 宣言パイプライン ランタイムに含まれる Structured Streaming Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「 Azure Event Hubs をパイプライン データ ソースとして使用する」を参照してください。

外部システムからデータを読み込む

Lakeflow Spark 宣言パイプラインでは、Azure Databricks でサポートされている任意のデータ ソースからのデータの読み込みがサポートされています。 データ ソースと外部サービスへの接続を参照してください。 サポートされているデータ ソースに対して Lakehouse フェデレーションを使用して外部データを読み込むこともできます。 Lakehouse Federation では Databricks Runtime 13.3 LTS 以上が必要であるため、Lakehouse Federation を使用するには、 プレビュー チャネルを使用するようにパイプラインを構成します。

一部のデータ ソースには、同等の SQL サポートがありません。 これらのデータ ソースのいずれかで Lakehouse Federation を使用できない場合は、Python を使用してソースからデータを取り込むことができます。 Python と SQL のソース ファイルを同じパイプラインに追加できます。 次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするための具体化されたビューを宣言します。

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

クラウド オブジェクト ストレージから小さいデータセットまたは静的データセットを読み込む

Apache Spark の読み込み構文を使用して、小規模または静的なデータセットを読み込むことができます。 Lakeflow Spark 宣言型パイプラインでは、Azure Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされています。 完全な一覧については、「 データ形式のオプション」を参照してください。

次の例では、JSON を読み込んでテーブルを作成する方法を示します。

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

read_files SQL 関数は、Azure Databricks 上のすべての SQL 環境に共通です。 パイプラインで SQL を使用してファイルに直接アクセスする場合に推奨されるパターンです。 詳細については、「オプションの」を参照してください。

Python カスタム データ ソースからデータを読み込む

Python カスタム データ ソースを使用すると、カスタム形式でデータを読み込むことができます。 特定の外部データ ソースの読み取りと書き込みを行うコードを記述することも、既存のPython コードを使用して独自の内部システムからデータを読み取ることもできます。 Python データ ソースの開発の詳細については、「 PySpark カスタム データ ソース」を参照してください。

次の例では、カスタム データ ソースを my_custom_datasource 形式名で登録し、バッチ モードとストリーミング モードの両方で読み取ります。

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

ソース ストリーミング テーブルの変更を無視するようにストリーミング テーブルを構成する

既定では、ストリーミング テーブルには追加専用のソースが必要です。 ソース ストリーミング テーブルで更新や削除が必要な場合 (たとえば、GDPR の "忘れられる権利" 処理の場合) は、 skipChangeCommits フラグを使用してそれらの変更を無視します。 このフラグは、spark.readStream関数を使用するoption()でのみ機能し、ソース ストリーミング テーブルが create_auto_cdc_flow() 関数のターゲットである場合は使用できません。 詳細については、「 ソース差分テーブルへの変更の処理」を参照してください。

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする

Azure Databricks シークレットを使用して アクセス キーやパスワードなどの資格情報を格納できます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 パイプラインのクラシック コンピューティングの構成に関するページを参照してください。

次の例では、シークレットを使用して、自動ローダーを使用してAzure Data Lake Storageストレージ アカウントから入力データを読み取るために必要なアクセス キーを格納します。 この同じ方法を使用して、パイプラインに必要なシークレット (たとえば、S3 にアクセスするための AWS キー、Apache Hive メタストアへのパスワードなど) を構成できます。

Azure Data Lake Storage の操作の詳細については、Azure Data Lake Storage と Blob Storage への接続に関するページを参照してください。

シークレット値を設定するspark.hadoop.構成キーに、spark_conf プレフィックスを追加する必要があります。

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

このコード サンプルでは、次の値を置き換えます。

Placeholder に置き換える
<container-name> Azure ストレージ アカウント コンテナーの名前。
<storage-account-name> ADLS ストレージ アカウント名。
<path> パイプライン出力データとメタデータのパス。
<scope-name> Azure Databricks シークレット スコープ名。
<secret-name> Azure ストレージ アカウントのアクセス キーを含むキーの名前。
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

このコード サンプルでは、次の値を置き換えます。

Placeholder に置き換える
<container-name> 入力データを格納するAzure ストレージ アカウント コンテナーの名前。
<storage-account-name> ADLS ストレージ アカウント名。
<path-to-input-dataset> 入力データセットへのパス。