Lernprogramm: Erstellen mehrerer Flüsse mit unterschiedlichen Parametern

Eine Pipeline kann mehrere Flüsse enthalten, die fast identisch sind und sich nur von einigen Parametern unterscheiden. Das explizite Definieren dieser Flüsse ist fehleranfällig, redundant und schwierig zu verwalten. Metaprogrammierung mit Python inneren Funktionen generiert sich wiederholende Abläufe dynamisch, wobei jeder Aufruf einen anderen Satz von Parametern liefert.

Übersicht

Metaprogrammierung in Lakeflow Spark Declarative Pipelines verwendet Python innere Funktionen. Da diese Funktionen von der Pipelinelaufzeit lazily ausgewertet werden, können Sie Dekoratoren innerhalb einer Factoryfunktion umschließen @dp.table und diese Factory mehrmals mit unterschiedlichen Argumenten aufrufen. Jeder Aufruf registriert einen neuen Fluss, ohne Code zu duplizieren.

Ausführliche Informationen zur Verwendung von for Schleifen mit Lakeflow Spark Declarative Pipelines finden Sie unter Erstellen von Tabellen in einer for Schleife.

Beispiel: Reaktionszeiten der Feuerwehr

Im folgenden Beispiel wird das integrierte Dataset der Feuerwehr verwendet, um die Nachbarschaften mit den schnellsten Notfallreaktionszeiten für jeden Anruftyp zu finden. Ohne Metaprogrammierung müssen Sie nahezu identische Tabellendefinitionen für jeden Anruftyp schreiben (Alarme, Strukturfeuer, medizinischer Vorfall). Bei der Metaprogrammierung generiert eine einzelne Factory-Funktion alle.

Schritt 1: Definieren der Rohaufnahmetabelle

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Schritt 2: Definieren der Flow Factory-Funktion

Die generate_tables Factoryfunktion registriert zwei Tabellen für jeden Aufruftyp: eine gefilterte Anruftabelle und eine bewertete Antwortzeittabelle. Beide werden als innere Funktionen erstellt, die mit @dp.table dekoriert sind.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Schritt 3: Aufrufen der Factory und Definieren der Zusammenfassungstabelle

Rufen Sie die Factory einmal für jeden Anruftyp auf, und definieren Sie dann eine Zusammenfassungstabelle, die die Ergebnisse zusammenstellt, um die Viertel zu finden, die am häufigsten in allen Kategorien angezeigt werden.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Nachdem Sie diese Pipeline ausgeführt haben, erstellen Sie wie in diesem Diagramm eine Reihe ähnlicher Tabellen:

Diagramm der tabellen, die von diesem Lernprogramm generiert wurden.

Wichtige Konzepte

  • Innere Funktionen werden verzögert registriert: Der @dp.table Dekorateur führt die Funktion nicht sofort aus. Sie registriert die Funktion mit der Pipelinelaufzeit, wodurch das vollständige Dataflowdiagramm aufgelöst wird, bevor die Ausführung beginnt.
  • Closures erfassen Parameter: Jede innere Funktion umfasst die an die Factory übergebenen Parameter (call_table, response_table, filter), sodass jeder registrierte Flow seinen eigenen isolierten Satz von Werten verwendet.
  • Dynamische Tabellenlisten: Die Verwendung einer Liste, um programmgesteuert all_tables generierte Tabellennamen nachzuverfolgen, erleichtert das spätere Referenzieren (z. B. in einer Union oder einem Join).

Weitere Ressourcen