Condividi tramite


Architettura fan-in e fan-out nelle pipeline dichiarative di Lakeflow Spark

Fan-in e fan-out sono modelli comuni nell'ingegneria dei dati moderna per creare pipeline scalabili e affidabili. Questa pagina descrive entrambi i modelli e illustra come implementarli in Pipeline dichiarative di Lakeflow Spark.

Che cosa sono fan-in e fan-out?

Fan-in è un pattern architetturale in cui i dati provenienti da più origini vengono acquisiti ed elaborati all'interno di una singola pipeline.

Architettura fan-in, che mostra più set di dati di origine che si uniscono in un singolo set di dati di output.

Le origini possono includere:

  • Flussi di eventi in tempo reale (ad esempio, Kafka e Kinesis)
  • Archiviazione cloud (ad esempio, S3, ADLS e Google Cloud Storage)
  • Database relazionali (ad esempio, PostgreSQL, MySQL e Snowflake)
  • Dispositivi IoT (ad esempio sensori, log e API)

Consolidando flussi di dati diversi in un singolo livello di elaborazione, l'integrazione consente la trasformazione, la deduplicazione e l'arricchimento coerente dei dati prima che questi vengano trasferiti a valle.

Fan-out segue un approccio uno-a-molti, instradando un singolo flusso di dati elaborato verso più destinazioni.

Architettura fan-out, che mostra un singolo insieme di dati di origine trasformato o scritto in più insiemi di dati di output.

Le destinazioni possono includere:

  • Tabelle delta per l'archiviazione strutturata
  • Sistemi di avviso in tempo reale per il rilevamento anomalie
  • Modelli di Machine Learning per l'analisi predittiva
  • Data warehouse per la creazione di report e l'analisi
  • Code di messaggi per la comunicazione asincrona e l'elaborazione disaccoppiata

Questo modello garantisce che ogni sistema downstream riceva i dati nel formato richiesto, consentendo alle organizzazioni di integrare i dati di streaming in varie applicazioni aziendali.

In pratica, le pipeline spesso combinano entrambi i modelli. Per esempio:

  • Un'azienda raccoglie i dati sulle attività degli utenti da più applicazioni, siti Web e dispositivi mobili (fan-in).
  • I dati elaborati vengono archiviati in Delta Lake per l'analisi cronologica, mentre gli avvisi in tempo reale si attivano per attività insolite (fan-out).

Implementare il fan-in con i flussi di aggiunta

Le pipeline fan-in uniscono più flussi di dati in una destinazione unificata. In genere, ciò richiede query di unione complesse e checkpoint manuali. I flussi di accodamento semplificano questa operazione consentendo a vari flussi di dati di alimentare direttamente una singola tabella di flusso continuo senza unioni esplicite o logica complessa. Ogni origine viene gestita in modo indipendente, consentendo l'inserimento e gli aggiornamenti incrementali dei dati.

Ad esempio, usare i flussi di aggiunta per consolidare più argomenti Kafka o flussi di dati regionali in una tabella di destinazione unificata.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("all_topics")

# Kafka stream from topic1
@dp.append_flow(target="all_topics")
def topic1():
    return spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "host1:port1,...") \
        .option("subscribe", "topic1") \
        .load()

# Kafka stream from topic2
@dp.append_flow(target="all_topics")
def topic2():
    return spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "host1:port1,...") \
        .option("subscribe", "topic2") \
        .load()

SQL

CREATE OR REFRESH STREAMING TABLE all_topics;

CREATE FLOW
  topic1
AS INSERT INTO
  all_topics BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  all_topics BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Implementare il fan-out

Le pipeline di fanout distribuiscono i dati da un'origine a più output. Lakeflow Spark Declarative Pipelines supporta tre approcci a seconda del caso d'uso.

Usare i cicli for per la logica generalizzata

Se la logica ETL è identica in più destinazioni, usare Python per i cicli per generare in modo dinamico più tabelle tramite cicli con parametri. In questo modo si evita la scrittura di codice ripetitivo e si semplifica il ridimensionamento della pipeline tramite la configurazione.

Importante

Ogni flusso o tabella generato elabora l'intero set di dati di origine in modo indipendente. Per le origini con velocità effettiva condivisa o limiti di capacità di lettura, ad esempio Kafka, ciò può influire significativamente sulle prestazioni. Valutare attentamente l'approccio per tali origini prima di usarlo.

regions = ["US", "EU", "APAC"]

for region in regions:
    @dp.materialized_view(name=f"orders_{region.lower()}_filtered")
    def filtered_orders(region_filter=region):
        return spark.read.table("combined_orders").filter(f"region = '{region_filter}'")

Usare flussi indipendenti per la logica specifica della destinazione

Quando le trasformazioni ETL variano in modo significativo per ogni destinazione, implementare flussi di dati indipendenti. Questo approccio ha un controllo preciso e prestazioni ottimizzate su misura per ogni caso d'uso.

from pyspark import pipelines as dp

# Grouped output
@dp.materialized_view(name="orders_sink")
def region_orders():
    df = spark.read.table("combined_orders").groupBy("region").count()
    # Add additional logic here
    return df

# BI materialized view
@dp.materialized_view(name="orders_bi_materialized")
def orders_bi():
    return spark.read.table("combined_orders").select("order_id", "amount", "region")

# ML feature table
@dp.materialized_view(name="orders_ml_features")
def orders_ml():
    return (
        spark.read.table("combined_orders")
        .withColumn("high_value_order", col("amount") > 1000)
        .select("order_id", "high_value_order", "region")
    )

Usare ForEachBatch per il routing personalizzato

Importante

foreach_batch_sink è disponibile in anteprima pubblica attraverso il canale Lakeflow Spark Declarative Pipelines PREVIEW. Vedere channel in Configurazioni della pipeline.

Il foreach_batch_sink applica la logica personalizzata a ogni micro-batch, consentendo trasformazioni complesse, fusione o instradamento verso più destinazioni, includendo quelle senza supporto di streaming predefinito, come destinazioni JDBC.

Importante

Ogni batch esegue più operazioni di scrittura in modo indipendente. I fallimenti di un'operazione non eseguono automaticamente il rollback delle scritture precedentemente eseguite con successo. Ciò può causare dati parziali o incoerenti tra i target, in particolare durante l'elaborazione di fonti condivise come Kafka. Progettare le pipeline con un'attenta gestione degli errori e test accurati. Vedere Usare ForEachBatch per scrivere in sink di dati arbitrari nelle pipeline.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(batch_df, batch_id):
    # Write to Delta table
    batch_df.write.format("delta").mode("append").saveAsTable("my_catalog.my_schema.my_delta_table")

    # Write to JSON files
    batch_df.write.format("json").mode("append").save("/Volumes/path/to/json_target")

@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/data/incoming/events")
    )

Modelli ForEachBatch comuni

foreach_batch_sink supporta più modelli. Alcuni modelli comuni includono:

  • Flusso singolo al sink multi-destinazione: un singolo append_flow legge da un'origine di streaming e instrada i dati a un foreach_batch_sink. Il sink gestisce la scrittura verso più destinazioni, come Delta, JSON e sistemi esterni. Questo è ideale per semplici casi d'uso multi-output con logica di trasformazione condivisa.

  • Più flussi in un sink unificato: più append_flow origini, ad esempio directory, formati, argomenti Kafka o API esterne, uniscono in un unico foreach_batch_sink. In questo modo viene centralizzata la logica di trasformazione comune, la gestione dell'output e la gestione degli errori. Poiché è necessario mantenere un solo checkpoint, questo approccio riduce significativamente la complessità del coordinamento. È particolarmente utile quando si gestiscono code di messaggi come quelle di Kafka o quando si interfacciano API esterne.

  • Un flusso a un sink (molte coppie indipendenti): Ogni append_flow ha un foreach_batch_sink dedicato, stabilendo relazioni chiare e isolate tra singole origini e le loro destinazioni. Questa soluzione è ideale per le pipeline con molti flussi indipendenti che richiedono logica di elaborazione univoca, risoluzione dei problemi semplificata e gestione degli errori isolati.

In pratica, questi approcci spesso si integrano tra loro. Ad esempio, usa i cicli per generare dinamicamente più flussi di accodamento (append) per scenari fan-in su larga scala e poi distribuisci i risultati utilizzando cicli o foreach_batch_sink per fan-out.

Procedure consigliate

  • I flussi di accodamento richiedono che gli schemi di origine siano allineati alla tabella di streaming di destinazione per evitare errori di elaborazione. Usare le aspettative dello schema di Pipeline dichiarative di Lakeflow Spark per rilevare e gestire in modo proattivo le eccezioni, garantendo la coerenza dello schema in tutta la pipeline.
  • Mantenere la logica del ciclo ben definita e semplice.
  • Assegnare un nome chiaro a ogni flusso e tabella per mantenere la leggibilità.
  • Monitorare l'utilizzo delle risorse per ridimensionare in modo efficiente ed evitare colli di bottiglia nelle prestazioni.
  • Quando si scrive nelle code di messaggi, utilizzare un foreach_batch_sink con un unico append_flow che consolida tutti i flussi di input. Ciò semplifica la gestione dello stato downstream e del checkpoint.

Limitazioni

  • L'interfaccia utente di derivazione delle pipeline dichiarative di Lakeflow Spark potrebbe non mostrare metriche e metadati a livello di flusso per le nuove origini del flusso di accodamento.
  • Espandere anziché ridurre l'elenco di valori utilizzati in un ciclo for. Se un set di dati definito in precedenza viene omesso nelle esecuzioni successive della pipeline, viene eliminato automaticamente dallo schema di destinazione, causando una perdita di dati imprevista.