Condividi tramite


Esercitazione: Creare più flussi con parametri diversi

Una pipeline può contenere più flussi quasi identici, che differiscono solo per pochi parametri. La definizione di questi flussi in modo esplicito è soggetta a errori, ridondante e difficile da gestire. La metaprogrammazione con Python funzioni interne genera flussi ripetitivi in modo dinamico, con ogni chiamata che fornisce un set diverso di parametri.

Informazioni generali

La metaprogrammazione nelle pipeline dichiarative di Lakeflow Spark utilizza le funzioni interne di Python. Poiché queste funzioni vengono valutate pigramente dal runtime della pipeline, è possibile inserire i decorators all'interno di una funzione factory e chiamare questa factory più volte con argomenti diversi. Ogni chiamata registra un nuovo flusso senza duplicare il codice.

Per informazioni dettagliate sull'uso for di cicli con le pipeline dichiarative di Lakeflow Spark, vedere Creare tabelle in un for ciclo.

Esempio: tempi di risposta del reparto antincendio

Nell'esempio seguente viene usato il set di dati predefinito del reparto antincendio per trovare i quartieri con i tempi di risposta di emergenza più rapidi per ogni tipo di chiamata. Senza metaprogrammazione, è necessario scrivere definizioni di tabella quasi identiche per ogni tipo di chiamata (allarmi, struttura incendio, incidente medico). Con la metaprogrammazione, una singola funzione factory genera tutte queste funzioni.

Passaggio 1: Definire la tabella di ingestione grezza

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Passaggio 2: Definire la funzione del factory di flusso

La generate_tables funzione factory registra due tabelle per ogni tipo di chiamata: una tabella delle chiamate filtrata e una tabella di tempo di risposta classificata. Entrambi vengono creati come funzioni interne decorate con @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Passaggio 3: Invocare la factory e definire la tabella di riepilogo

Chiamare la factory una volta per ogni tipo di chiamata, quindi definire una tabella di riepilogo che unisce i risultati per trovare i quartieri visualizzati più spesso in tutte le categorie.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Dopo aver eseguito questa pipeline, si creerà un set di tabelle simili, come il grafico seguente:

Grafico delle tabelle generate da questa esercitazione.

Concetti chiave

  • Le funzioni interne vengono registrate in modo pigro: il decoratore @dp.table non esegue immediatamente la funzione. Registra la funzione con il runtime della pipeline, che risolve il grafico del flusso di dati completo prima dell'inizio dell'esecuzione.
  • Le chiusure acquisiscono i parametri: ogni funzione interna cattura i parametri passati alla factory (, call_table, response_table), in modo che ogni flusso registrato utilizzi il proprio set di valori isolati.
  • Elenchi di tabelle dinamiche: l'uso di un elenco come all_tables per tenere traccia dei nomi di tabella generati a livello di codice rende più semplice farvi riferimento in un secondo momento, ad esempio in un'unione o un join.

Risorse aggiuntive