Tutorial: Crie múltiplos fluxos com parâmetros diferentes

Um oleoduto pode conter múltiplos fluxos que são quase idênticos, diferindo apenas por alguns parâmetros. Definir estes fluxos explicitamente é propenso a erros, redundante e difícil de manter. A metaprogramação com funções internas em Python gera fluxos repetitivos de forma dinâmica, com cada invocação a fornecer um conjunto diferente de parâmetros.

Descrição geral

A metaprogramação no Lakeflow Spark Declarative Pipelines utiliza funções internas em Python. Como estas funções são avaliadas de forma preguiçosa pelo runtime do pipeline, podes envolver @dp.table decoradores dentro de uma função de fábrica e chamar essa fábrica várias vezes com argumentos diferentes. Cada chamada regista um novo fluxo sem duplicar código.

Para obter detalhes sobre como usar for loops com Lakeflow Spark Declarative Pipelines, consulte Criar tabelas num for ciclo.

Exemplo: tempos de resposta dos bombeiros

O exemplo seguinte utiliza o conjunto de dados incorporado do corpo de bombeiros para encontrar os bairros com os tempos de resposta a emergência mais rápidos para cada tipo de chamada. Sem metaprogramação, deve escrever definições de tabela quase idênticas para cada tipo de chamada (Alarmes, Incêndio na Estrutura, Incidente Médico). Com a metaprogramação, uma única função de fábrica gera todos eles.

Passo 1: Defina a tabela bruta de ingestão

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Passo 2: Definir a função de fábrica de fluxo

A generate_tables função de fábrica regista duas tabelas para cada tipo de chamada: uma tabela de chamadas filtrada e uma tabela de tempo de resposta ordenada. Ambos são criados como funções internas decoradas com @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Passo 3: Invocar a fábrica e definir a tabela resumo

Chame a fábrica uma vez para cada tipo de chamada, depois defina uma tabela resumida que unisse os resultados para encontrar as vizinhanças que aparecem com mais frequência em todas as categorias.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Depois de executar este pipeline, irá criar um conjunto de tabelas semelhantes, como este gráfico:

Gráfico das tabelas geradas por este tutorial.

Conceitos-chave

  • As funções internas são registadas de forma preguiçosa: O @dp.table decorador não executa a função imediatamente. Regista a função com o tempo de execução do pipeline, que resolve o gráfico completo do fluxo de dados antes do início da execução.
  • Parâmetros de captura de fechos: Cada função interna fecha sobre os parâmetros passados à fábrica (call_table, response_table, filter), pelo que cada fluxo registado usa o seu próprio conjunto isolado de valores.
  • Listas de tabelas dinâmicas: Usar uma lista como all_tables para acompanhar nomes de tabelas gerados programaticamente facilita a sua referência posterior (por exemplo, numa união ou junção).

Recursos adicionais