Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Una pipeline può contenere più flussi quasi identici, che differiscono solo per pochi parametri. La definizione di questi flussi in modo esplicito è soggetta a errori, ridondante e difficile da gestire. La metaprogrammazione con Python funzioni interne genera flussi ripetitivi in modo dinamico, con ogni chiamata che fornisce un set diverso di parametri.
Informazioni generali
La metaprogrammazione nelle pipeline dichiarative di Lakeflow Spark utilizza le funzioni interne di Python. Poiché queste funzioni vengono valutate pigramente dal runtime della pipeline, è possibile inserire i decorators all'interno di una funzione factory e chiamare questa factory più volte con argomenti diversi. Ogni chiamata registra un nuovo flusso senza duplicare il codice.
Per informazioni dettagliate sull'uso for di cicli con le pipeline dichiarative di Lakeflow Spark, vedere Creare tabelle in un for ciclo.
Esempio: tempi di risposta del reparto antincendio
Nell'esempio seguente viene usato il set di dati predefinito del reparto antincendio per trovare i quartieri con i tempi di risposta di emergenza più rapidi per ogni tipo di chiamata. Senza metaprogrammazione, è necessario scrivere definizioni di tabella quasi identiche per ogni tipo di chiamata (allarmi, struttura incendio, incidente medico). Con la metaprogrammazione, una singola funzione factory genera tutte queste funzioni.
Passaggio 1: Definire la tabella di ingestione grezza
import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *
@dp.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
Passaggio 2: Definire la funzione del factory di flusso
La generate_tables funzione factory registra due tabelle per ogni tipo di chiamata: una tabella delle chiamate filtrata e una tabella di tempo di risposta classificata. Entrambi vengono creati come funzioni interne decorate con @dp.table.
all_tables = []
def generate_tables(call_table, response_table, filter):
@dp.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
@dp.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time"
)
def create_response_table():
return spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM {call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
all_tables.append(response_table)
Passaggio 3: Invocare la factory e definire la tabella di riepilogo
Chiamare la factory una volta per ogni tipo di chiamata, quindi definire una tabella di riepilogo che unisce i risultati per trovare i quartieri visualizzati più spesso in tutte le categorie.
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dp.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dp.read(t) for t in all_tables]
unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
Dopo aver eseguito questa pipeline, si creerà un set di tabelle simili, come il grafico seguente:
Concetti chiave
-
Le funzioni interne vengono registrate in modo pigro: il decoratore
@dp.tablenon esegue immediatamente la funzione. Registra la funzione con il runtime della pipeline, che risolve il grafico del flusso di dati completo prima dell'inizio dell'esecuzione. - Le chiusure acquisiscono i parametri: ogni funzione interna cattura i parametri passati alla factory (,
call_table,response_table), in modo che ogni flusso registrato utilizzi il proprio set di valori isolati. -
Elenchi di tabelle dinamiche: l'uso di un elenco come
all_tablesper tenere traccia dei nomi di tabella generati a livello di codice rende più semplice farvi riferimento in un secondo momento, ad esempio in un'unione o un join.