Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Eine Pipeline kann mehrere Flüsse enthalten, die fast identisch sind und sich nur von einigen Parametern unterscheiden. Das explizite Definieren dieser Flüsse ist fehleranfällig, redundant und schwierig zu verwalten. Metaprogrammierung mit Python inneren Funktionen generiert sich wiederholende Abläufe dynamisch, wobei jeder Aufruf einen anderen Satz von Parametern liefert.
Übersicht
Metaprogrammierung in Lakeflow Spark Declarative Pipelines verwendet Python innere Funktionen. Da diese Funktionen von der Pipelinelaufzeit lazily ausgewertet werden, können Sie Dekoratoren innerhalb einer Factoryfunktion umschließen @dp.table und diese Factory mehrmals mit unterschiedlichen Argumenten aufrufen. Jeder Aufruf registriert einen neuen Fluss, ohne Code zu duplizieren.
Ausführliche Informationen zur Verwendung von for Schleifen mit Lakeflow Spark Declarative Pipelines finden Sie unter Erstellen von Tabellen in einer for Schleife.
Beispiel: Reaktionszeiten der Feuerwehr
Im folgenden Beispiel wird das integrierte Dataset der Feuerwehr verwendet, um die Nachbarschaften mit den schnellsten Notfallreaktionszeiten für jeden Anruftyp zu finden. Ohne Metaprogrammierung müssen Sie nahezu identische Tabellendefinitionen für jeden Anruftyp schreiben (Alarme, Strukturfeuer, medizinischer Vorfall). Bei der Metaprogrammierung generiert eine einzelne Factory-Funktion alle.
Schritt 1: Definieren der Rohaufnahmetabelle
import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *
@dp.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
Schritt 2: Definieren der Flow Factory-Funktion
Die generate_tables Factoryfunktion registriert zwei Tabellen für jeden Aufruftyp: eine gefilterte Anruftabelle und eine bewertete Antwortzeittabelle. Beide werden als innere Funktionen erstellt, die mit @dp.table dekoriert sind.
all_tables = []
def generate_tables(call_table, response_table, filter):
@dp.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
@dp.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time"
)
def create_response_table():
return spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM {call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
all_tables.append(response_table)
Schritt 3: Aufrufen der Factory und Definieren der Zusammenfassungstabelle
Rufen Sie die Factory einmal für jeden Anruftyp auf, und definieren Sie dann eine Zusammenfassungstabelle, die die Ergebnisse zusammenstellt, um die Viertel zu finden, die am häufigsten in allen Kategorien angezeigt werden.
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dp.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dp.read(t) for t in all_tables]
unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
Nachdem Sie diese Pipeline ausgeführt haben, erstellen Sie wie in diesem Diagramm eine Reihe ähnlicher Tabellen:
Wichtige Konzepte
-
Innere Funktionen werden verzögert registriert: Der
@dp.tableDekorateur führt die Funktion nicht sofort aus. Sie registriert die Funktion mit der Pipelinelaufzeit, wodurch das vollständige Dataflowdiagramm aufgelöst wird, bevor die Ausführung beginnt. -
Closures erfassen Parameter: Jede innere Funktion umfasst die an die Factory übergebenen Parameter (
call_table,response_table,filter), sodass jeder registrierte Flow seinen eigenen isolierten Satz von Werten verwendet. -
Dynamische Tabellenlisten: Die Verwendung einer Liste, um programmgesteuert
all_tablesgenerierte Tabellennamen nachzuverfolgen, erleichtert das spätere Referenzieren (z. B. in einer Union oder einem Join).