チュートリアル: 異なるパラメーターを使用して複数のフローを作成する

パイプラインには、ほぼ同一の複数のフローが含まれる場合があり、いくつかのパラメーターによってのみ異なります。 これらのフローを明示的に定義することは、エラーが発生しやすく、冗長で、保守が困難です。 Python内部関数を使用したメタプログラミングでは、反復的なフローが動的に生成され、呼び出しごとに異なる一連のパラメーターが提供されます。

概要

Lakeflow Spark 宣言型パイプラインのメタプログラミングでは、内部関数Python使用されます。 これらの関数はパイプライン ランタイムによって遅延評価されるため、ファクトリ関数内 @dp.table デコレーターをラップし、異なる引数でそのファクトリを複数回呼び出すことができます。 各呼び出しでは、コードを複製せずに新しいフローが登録されます。

Lakeflow Spark 宣言パイプラインでfor ループを使用する方法の詳細については、「for ループでのテーブルの作成」を参照してください。

例: 消防署の応答時間

次の例では、組み込みの消防署データセットを使用して、各呼び出しの種類の緊急対応時間が最も速い近隣を検索します。 メタプログラミングを使用しない場合は、呼び出しの種類 (アラーム、構造火災、医療インシデント) ごとにほぼ同じテーブル定義を記述する必要があります。 メタプログラミングでは、1 つのファクトリ関数によってすべての関数が生成されます。

手順 1: 生のインジェスト テーブルを定義する

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

手順 2: フロー ファクトリ関数を定義する

generate_tables ファクトリ関数は、フィルター処理された呼び出しテーブルとランク付けされた応答時間テーブルの 2 つのテーブルを呼び出しの種類ごとに登録します。 どちらも、 @dp.tableで装飾された内部関数として作成されます。

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

手順 3: ファクトリを呼び出し、概要テーブルを定義する

呼び出しの種類ごとにファクトリを 1 回呼び出し、結果を結合する概要テーブルを定義して、すべてのカテゴリで最も頻繁に表示される近傍を見つけます。

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

このパイプラインを実行した後、次のグラフのような類似したテーブルのセットを作成します。

このチュートリアルで生成されたテーブルのグラフ。

主な概念

  • 内部関数は遅延的に登録されます@dp.table デコレーターは、関数をすぐに実行しません。 この関数はパイプライン ランタイムに登録され、実行が開始される前に完全なデータフロー グラフが解決されます。
  • クロージャはパラメーターをキャプチャします。各内部関数は、ファクトリに渡されたパラメーター (call_tableresponse_tablefilter) を閉じるので、登録された各フローは独自の分離された値セットを使用します。
  • 動的テーブル リスト: all_tables などのリストを使用してプログラムによって生成されたテーブル名を追跡すると、後で (共用体や結合など) 簡単に参照できます。

その他のリソース