CREATE STREAMING TABLE

適用対象: チェック マークあり Databricks SQL

ストリーミングまたは増分データ処理を追加でサポートする Delta テーブルであるストリーミング テーブルを作成します。

ストリーミング テーブルは、Lakeflow Spark 宣言パイプラインと Databricks SQL と Unity カタログでのみサポートされます。 サポートされている Databricks Runtime コンピューティングでこのコマンドを実行すると、構文のみが解析されます。 SQL を使用した Lakeflow Spark 宣言パイプライン コードの開発に関するページを参照してください。

構文

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

flow_clause
  FLOW { { INSERT BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

パラメーター

  • REFRESH

    指定した場合、クエリで定義されているソースから利用できる最新のデータでテーブルが更新されます。 クエリが開始される前に到着した新しいデータのみが処理されます。 コマンドの実行中にソースに追加される新しいデータは次の更新まで無視されます。 CREATE OR REFRESH からの更新操作は完全に宣言型です。 更新コマンドで元のテーブル作成ステートメントのすべてのメタデータが指定されていない場合、指定されていないメタデータは削除されます。

  • 存在しない場合

    ストリーミング テーブルが存在しない場合は作成します。 この名前のテーブルが既に存在する場合、CREATE STREAMING TABLE ステートメントは無視されます。

    IF NOT EXISTSOR REFRESH のいずれか 1 つだけを指定できます。

  • table_name

    作成されるテーブルの名前。 この名前には、テンポラル指定やオプション指定を含めないでください。 名前が修飾されていない場合、テーブルは現在のスキーマに作成されます。

  • テーブル仕様

    この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。

    テーブル スキーマで列を定義しない場合、AS query を指定する必要があります。

    • column_identifier

      列に設定する一意の名前。

      • カラムタイプ (column_type)

        列のデータ型を指定します。

      • NOT NULL

        指定した場合、列は NULL 値を受け取りません。

      • コメント column_comment

        列について説明する文字列リテラル。

      • column_constraint

        ストリーミング テーブル内の列に主キーまたは外部キー制約を追加します。 制約は、hive_metastore カタログ内のテーブルではサポートされていません。

      • MASK 句

        列マスク関数を追加して、機密データを匿名化します。 その列の後続のすべてのクエリは、列の元の値の代わりに、その列に対してその関数を評価した結果を受け取ります。 これは、値を編集するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ 違反時 { FAIL UPDATE | ROWを削除 } ]

        テーブルにデータ品質の期待値を追加します。 これらのデータ品質の期待値は、時間の経過と同時に追跡し、ストリーミング テーブルの イベント ログを介してアクセスできます。 テーブルの作成時とテーブル更新時の両方で、FAIL UPDATE 期待値により処理が失敗します。 DROP ROW 期待値が満たされない場合、行全体が削除されます。

        expectation_expr は、以下のものを除く、リテラル、テーブル内の列識別子、および決定論的な組み込みの SQL 関数または演算子で構成される場合があります。

        また expr には、サブクエリを含めることはできません。

      • テーブルの制約

        情報主キーまたは情報外部キーの制約をストリーミング テーブルに追加します。 主な制約は、hive_metastore カタログ内のテーブルに対してはサポートされません。

  • テーブル条項

    必要に応じて、パーティション分割、コメント、ユーザー定義プロパティ、新しいテーブルの更新スケジュールを指定します。 各サブ句は、1 回だけ指定できます。

    • パーティション分割基準

      テーブルをパーティション化するための、任意のテーブル列の一覧。

      メモ

      液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 ストリーミング テーブルに CLUSTER BY する代わりに、PARTITIONED BY を使用することを検討してください。

    • CLUSTER BY

      列のサブセットによってクラスター化する省略可能な句。 CLUSTER BY AUTOで自動液体クラスタリングを使用し、Databricks はクエリのパフォーマンスを最適化するためにクラスタリング キーをインテリジェントに選択します。 表に液体クラスタリングを使用するを参照してください。

      液体クラスタリングを PARTITIONED BYと組み合わせることはできません。

    • コメント table_comment

      テーブルを説明するためのSTRINGリテラル。

    • 既定の照合順序UTF8_BINARY

      適用対象:はい Databricks SQL チェックが Databricks Runtime 17.1 以降" とマークされているチェック

      ストリーミング テーブルの既定の照合順序を強制的に UTF8_BINARYします。 テーブルが作成されるスキーマに UTF8_BINARY以外の既定の照合順序がある場合、この句は必須です。 ストリーミング テーブルの既定の照合順序は、 query 内および列型の既定の照合順序として使用されます。

    • TBLPROPERTIES

      必要に応じて、1 つ以上のユーザー定義プロパティを設定します。

      この設定を使用して、このステートメントの実行に使用する Lakeflow Spark 宣言パイプライン ランタイム チャネルを指定します。 pipelines.channel プロパティの値を "PREVIEW" または "CURRENT" に設定します。 既定値は "CURRENT" です。 Lakeflow Spark 宣言パイプライン チャネルの詳細については、「 Lakeflow Spark 宣言パイプライン ランタイム チャネル」を参照してください。

    • schedule

      スケジュールには、 SCHEDULE ステートメントまたは TRIGGER ステートメントのいずれかを指定できます。

      • スケジュール [ REFRESH ] スケジュール文

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          定期的に発生する更新をスケジュールするには、EVERY 構文を使用します。 EVERY 構文が指定されている場合、ストリーミング テーブルまたは具体化されたビューは、指定された値 (HOURHOURSDAYDAYSWEEKWEEKS など) に基づいて、指定した間隔で定期的に更新されます。 次の表に、number に使用できる整数値を示します。

          時間単位 整数値
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1<= D<= 31
          WEEK or WEEKS 1 <= W <= 8

          メモ

          含まれる時間単位の単数形と複数形は、意味的に同等です。

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          quartz cron 値を使用して更新をスケジュールする場合。 有効な time_zone_values が受け入れられます。 AT TIME ZONE LOCAL はサポートされません。

          cron 式では、6 つのスペースで区切られたフィールドが次の順序で使用されます: seconds minutes hours day-of-month month day-of-week?またはday-of-monthday-of-weekを使用して、未指定のままにします。

          たとえば、 SCHEDULE CRON '0 0 0 * * ?' AT TIME ZONE 'UTC' は毎日午前 0 時 (UTC) に更新されます。

          AT TIME ZONE が存在しない場合は、セッション タイム ゾーンが使用されます。 AT TIME ZONE が存在せず、セッション タイム ゾーンも設定されていない場合は、エラーが発生します。 SCHEDULE は意味的に SCHEDULE REFRESH と同等です。

        スケジュールは CREATE コマンドの一部として指定できます。 ALTER STREAMING TABLE を使用するか、CREATE OR REFRESH コマンドを SCHEDULE 句と共に実行して、作成後にストリーミング テーブルのスケジュールを変更します。

      • トリガーオン UPDATE [ 最大ですべてのtrigger_interval ]

        必要に応じて、アップストリーム データ ソースが更新されたときに更新するようにテーブルを設定します (最大 1 分ごとに)。 AT MOST EVERYの値を設定して、更新の間に少なくとも最小限の時間を必要とします。

        アップストリーム データ ソースは、外部またはマネージド Delta テーブル (具体化されたビューやストリーミング テーブルを含む) か、依存関係がサポートされているテーブルの種類に制限されているマネージド ビューである必要があります。

        ファイル イベントを有効にすると、トリガーのパフォーマンスが向上し、トリガーの更新の制限の一部が増える可能性があります。

        trigger_intervalは、少なくとも 1 分の INTERVAL ステートメントです。

        TRIGGER ON UPDATE には次の制限があります

        • TRIGGER ON UPDATEを使用する場合、ストリーミング テーブルあたり 10 個以下のアップストリーム データ ソース。
        • TRIGGER ON UPDATEでは、最大 1,000 個のストリーミング テーブルまたは具体化されたビューを指定できます。
        • AT MOST EVERY句の既定値は 1 分で、1 分未満にすることはできません。
  • WITH ROW FILTER 句

    行フィルター関数をテーブルに追加します。 そのテーブルからのそれ以降のすべてのクエリでは、関数がブール値 TRUE に評価される行のサブセットを受け取ります。 これは、特定の行をフィルター処理するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。

  • フロー

    Important

    この機能は ベータ版です。 Databricks Runtime 17.3 以降が必要です。

    必要に応じて、テーブルを作成する フロー をインラインで定義します。 フローは、テーブルの内容を更新するステートフル クエリです。 FLOWを指定しない場合は、代わりにAS queryを使用できます。 別の REFRESH STREAMING TABLE ステートメントを使用すると、フローを実行できます。 次のいずれかのフローの種類を指定できます。

    • INSERT 名前別

      列名でテーブルにデータを挿入します。 クエリはストリーミング クエリである必要があります。 ストリーミング セマンティクスを使用してソースから読み取る場合は、 STREAM キーワードを使用します。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。

      メモ

      FLOW INSERT BY NAME は、 AS queryの使用と同じです。 次の 2 つのステートメントの動作は同じです。

      CREATE OR REFRESH STREAMING TABLE raw_data
      AS SELECT * FROM STREAM read_files('abfss://my_path');
      
      CREATE OR REFRESH STREAMING TABLE raw_data
      FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
      
    • AUTO CDC

      ソースからテーブルへの変更データ キャプチャ (CDC) レコードを処理する AUTO CDC フローを定義します。 ソース データに CDC セマンティクスが含まれている場合は、 AUTO CDC を使用します。 詳細については、CREATE STREAMING TABLEを参照してください。FLOW AUTO CDC

  • AS クエリ

    この句により、query からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 そのためには増分的に処理するリレーションに STREAM キーワードを追加します。 querytable_specification を一緒に指定するとき、table_specification に指定されているテーブル スキーマに、query から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。 table_specification で指定されているが、query から返されない列はクエリ時に null 値を返します。

ストリーミング テーブルと他のテーブルの違い

ストリーミング テーブルはステートフル テーブルであり、増加するデータセットを処理するときに各行を 1 回だけ処理するように設計されています。 ほとんどのデータセットは時間が経過するにつれて増大し続けるため、ストリーミング テーブルは、大半のインジェスト ワークロードに適しています。 ストリーミング テーブルは、データの鮮度と待ち時間の短さが要求されるパイプラインに最適です。 また、非常に大規模な変換を行う用途にも適しています。これは、新しいデータが入ってくるのに応じて増分方式で結果を計算し続けて最新の状態に保つことができ、更新のたびにソース データ全体を再計算する必要がないためです。 ストリーミング テーブルは追加専用のデータ ソースを想定して設計されています。

ストリーミング テーブルは、REFRESH などの追加コマンドを受け取ります。このコマンドは、クエリで提供されるソースで利用できる最新のデータを処理します。 指定されたクエリに対する変更は、以前に処理されたデータではなく、REFRESH を呼び出すことによって新しいデータにのみ反映されます。 既存のデータにも変更を適用するには、REFRESH TABLE <table_name> FULL を実行するために FULL REFRESH を実行する必要があります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り捨てられるため、データの履歴全体を保持しないソースや、Kafka などの短い保持期間を持つソースに対して完全な更新を呼び出すことはできません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。

行フィルターと列マスク

行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。

列マスクを使用すると、テーブル スキャンが行をフェッチするたびに列の値をマスクできます。 その列に関連するすべての今後のクエリは、列の元の値を置き換えて、列に対して関数を評価した結果を受け取ります。

行フィルターと列マスクの使用方法の詳細については、「 行フィルターと列マスク」を参照してください。

行フィルターと列マスクの管理

ストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH ステートメントを通じて追加、更新、または削除する必要があります。

動作

  • 定義者として更新: CREATE OR REFRESH または REFRESH ステートメントがストリーミング テーブルを更新すると、行フィルター関数は定義者の権限 (テーブル所有者) で実行されます。 つまり、テーブルの更新では、ストリーミング テーブルを作成したユーザーのセキュリティ コンテキストが使用されます。
  • クエリ: ほとんどのフィルターは定義者の権限で実行されますが、ユーザー コンテキスト ( CURRENT_USERIS_MEMBERなど) をチェックする関数は例外です。 これらの関数は呼び出し元として実行されます。 この方法では、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。

可観測性

DESCRIBE EXTENDEDINFORMATION_SCHEMA、またはカタログ エクスプローラーを使用して、特定のストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。

制限事項

  • テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
  • ALTER TABLE コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。
  • INSERT INTOMERGE などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。
  • 次のコマンドは、ストリーミング テーブルではサポートされていません。
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing はサポートされていません。
  • テーブルの名前変更や所有者の変更はサポートされていません。
  • PRIMARY KEYFOREIGN KEYなどのテーブル制約は、hive_metastore カタログ内のストリーミング テーブルではサポートされていません。
  • 生成された列、ID 列、既定の列はサポートされていません。

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')

-- Creates a streaming table using a FLOW to append data from files
> CREATE OR REFRESH STREAMING TABLE raw_data
  FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Creates a streaming table using an AUTO CDC flow to apply changes from a change feed
> CREATE OR REFRESH STREAMING TABLE target
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;