Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo discute l'uso di foreachBatch con Structured Streaming per scrivere l'output di una query di streaming a fonti di dati prive di un sink di streaming esistente.
Il modello streamingDF.writeStream.foreachBatch(...) di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch accettano due parametri:
- DataFrame con i dati di output di un micro batch.
- ID univoco del micro batch.
È necessario usare foreachBatch per le operazioni di merge Delta Lake in Structured Streaming. Consulta Upsert dalle query di streaming utilizzando foreachBatch.
Applicare operazioni aggiuntive sul dataframe
Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch() è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch() e l'operazione SQL MERGE INTO per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.
Importante
-
foreachBatch()fornisce solo garanzie di scrittura almeno una volta. Tuttavia, è possibile usare ilbatchIdfornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare da soli sulla semantica end-to-end. -
foreachBatch()non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usareforeach()invece . - Quando si usa
foreachBatchcon un operatore con stato, è importante utilizzare completamente ogni batch prima del completamento dell'elaborazione. Vedere Utilizzo completo di ogni dataframe batch
È possibile richiamare un dataframe vuoto con foreachBatch() e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Modifiche del comportamento per foreachBatch in Databricks Runtime 14.0
In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso standard si applicano le modifiche di comportamento seguenti:
- I comandi di
print()scrivono l'output nei log del driver. - Non è possibile accedere al modulo secondario
dbutils.widgetsall'interno della funzione. - Tutti i file, i moduli o gli oggetti a cui si fa riferimento nella funzione devono essere serializzabili e disponibili in Spark.
Riutilizzare le origini dati batch esistenti
Usando foreachBatch(), è possibile usare scrittori di dati batch esistenti per sink di dati che potrebbero non supportare Structured Streaming. Ecco alcuni esempi:
Molte altre origini dati batch possono essere usate da foreachBatch(). Vedere Connettersi a origini dati e servizi esterni.
Scrivere in più posizioni
Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più scrittori di Structured Streaming per migliorare la parallelizzazione e la velocità effettiva.
L'uso di foreachBatch per scrivere su più sink serializza l'esecuzione delle scritture di streaming, il che può aumentare la latenza per ogni micro-batch.
Se si utilizza foreachBatch per scrivere su più tabelle Delta, consultare Scritture idempotenti delle tabelle in foreachBatch.
Consumare completamente ogni batch DataFrame
Quando si usano operatori con stato , ad esempio usando dropDuplicatesWithinWatermark, ogni iterazione batch deve utilizzare l'intero dataframe o riavviare la query. Se non si utilizza l'intero dataframe, la query di streaming avrà esito negativo con il batch successivo.
Questo può verificarsi in diversi casi. Gli esempi seguenti illustrano come correggere le query che non utilizzano correttamente un dataframe.
Uso intenzionale di un subset del batch
Se si è preoccupati solo di un subset del batch, è possibile avere codice come il seguente.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In questo caso, gestisce batch_df.show(2) solo i primi due elementi del batch, che è previsto, ma se sono presenti più elementi, devono essere utilizzati. Il codice seguente usa il dataframe completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In questo caso, la do_nothing funzione ignora automaticamente il resto del dataframe.
Gestione di un errore in un batch
Per la gestione degli errori in foreachBatch, Databricks consiglia di consentire alla query di streaming di fallire rapidamente e di basarsi invece sul livello di orchestrazione, come Lakeflow Jobs o Apache Airflow, per gestire la logica di ripetizione dei tentativi. Si tratta di un processo molto più sicuro rispetto alla creazione di cicli di ripetizione complessi nel codice, in cui può verificarsi una perdita di dati.
Ecco le linee guida in base alla destinazione di scrittura:
| Destinazione | Examples | Linee guida |
|---|---|---|
| Operazioni dei DataFrame | Tabelle Delta Lake | È necessario usare txnAppId e txnVersion opzioni di scrittura, vincolando txnVersion a batchId, per garantire l'idempotenza e proteggere la correttezza dei dati nei tentativi. Non intercettare e riprovare le eccezioni localmente. Databricks consiglia invece di consentire la propagazione degli errori in modo che le metriche Spark rimangano accurate, i dati non vengano duplicati e che l'agente di orchestrazione possa ripetere correttamente il batch completo. |
| Codice personalizzato e destinazioni esterne |
.collect(), database OLTP, code di messaggistica, API |
Implementare la propria idempotenza. È necessario presupporre che qualsiasi operazione possa e verrà ritentata tra batch. Se batchId rimane invariato, il risultato dell'operazione deve rimanere invariato. È possibile ritentare errori puramente temporanei, ad esempio brevi timeout di connessione, ma prestare estrema attenzione per evitare scritture parziali o duplicate se il nuovo tentativo ha esito negativo. L'approccio più sicuro consiste nel consentire la propagazione degli errori e consentire all'agente di orchestrazione di ripetere l'intero batch. |
Di seguito sono riportati alcuni esempi di tipi di eccezione e suggerimenti per la gestione in foreachBatch:
| Tipo di eccezione | Examples | Azione consigliata |
|---|---|---|
| Errori temporanei del collettore |
SQLTransientConnectionException, HTTP 429, timeouts |
Catch: riprovare o inviare a una coda di messaggi non recapitabili |
| Violazioni di vincoli di chiave o duplicati quando il sink è idempotente | SQLIntegrityConstraintViolationException |
Catch: log e sopprimi |
| Errori riprovabili personalizzati | Eccezioni socket di cui è stato eseguito il wrapping, errori di database ritentabili | Catch: incrementa le metriche e consente la continuazione controllata |
| Errori di logica o schema |
NullPointerException, AttributeError, , mancata corrispondenza dello schema |
Propaga: consentire a Spark di non eseguire la query |
| Errori sink non riprovabili o bug della logica non rilevati |
ValueError, PermissionError |
Propagare: lasciare che Spark mandi in errore la query |
| Errori critici |
OutOfMemoryError, stato danneggiato, violazioni dell'integrità dei dati |
Propaga: consentire a Spark di fallire l'esecuzione della query |
Esempi di codice: gestione delle eccezioni
Gli esempi seguenti generano intenzionalmente un errore in foreach per mostrare approcci diversi per gestire l'errore:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Il codice precedente gestisce e elimina automaticamente l'errore e potrebbe non utilizzare il resto del batch. Esistono due opzioni per gestire questa situazione.
Puoi riportare l'errore, che così viene passato al livello di orchestrazione per tentare nuovamente il batch. Questo può risolvere l'errore, se si tratta di un problema temporaneo o generarlo per il team operativo per tentare di risolvere manualmente. A tale scopo, modificare il partial_func codice in modo che sia simile al seguente:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
In secondo luogo, se si vuole intercettare l'eccezione e ignorare il resto del batch, è possibile modificare il codice in modo da usare la do_nothing funzione per ignorare automaticamente il resto del batch.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()