Condividi tramite


Considerazioni sulla produzione per Structured Streaming

Questa pagina contiene suggerimenti per la pianificazione dei carichi di lavoro Structured Streaming utilizzando job su Azure Databricks.

Databricks consiglia di configurare sempre quanto segue:

  • Rimuovere il codice non necessario dai notebook che potrebbero restituire risultati, ad esempio display e count.
  • Non eseguire carichi di lavoro Structured Streaming usando il calcolo multiuso. Pianificare sempre i flussi come lavori utilizzando il calcolo job.
  • Pianificazione dei processi nella modalità Continuous. Questo si riferisce alla funzionalità di pianificazione dei processi di Azure Databricks, non al trigger interval di Structured Streaming.
  • Non abilitare la scalabilità automatica per il calcolo nei processi di Structured Streaming.

Alcuni carichi di lavoro traggono vantaggio da quanto segue:

Azure Databricks ha introdotto le Pipeline dichiarative di Lakeflow Spark per ridurre le complessità nella gestione dell'infrastruttura di produzione per i carichi di lavoro Structured Streaming. Databricks consiglia di usare le pipeline dichiarative di Lakeflow Spark per le nuove pipeline di streaming strutturato. Vedere Pipeline dichiarative di Lakeflow Spark.

Nota

La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare le Pipeline Dichiarative Spark di Lakeflow con una scalabilità automatica avanzata per i carichi di lavoro di streaming. Vedere Ottimizzare l'utilizzo del cluster delle pipeline dichiarative di Lakeflow Spark con scalabilità automatica.

:::nota Calcolo serverless

Nell'ambiente di calcolo serverless sono supportati solo Trigger.AvailableNow() e Trigger.Once() . Databricks consiglia Trigger.AvailableNow().

Per lo streaming continuo in un ambiente di calcolo serverless, usare la modalità pipeline Attivata oppure Continua in modalità continua.

Vedere Limitazioni dello streaming.

:::

Progettare carichi di lavoro di streaming per aspettarsi un errore

Databricks consiglia di configurare sempre i processi di streaming per il riavvio automatico in caso di errore. Alcune funzionalità, inclusa l'evoluzione dello schema, richiedono che i carichi di lavoro Structured Streaming siano configurati per riprovare automaticamente. Consultare Configurare i processi Structured Streaming per riavviare le query di streaming in caso di errore.

Alcune operazioni, ad esempio foreachBatch, forniscono garanzie di tipo almeno una volta anziché esattamente una volta. Per queste operazioni, assicurarsi che la pipeline di elaborazione sia idempotente. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.

Nota

Quando una query viene riavviata, viene processato il micro-batch pianificato durante esecuzione precedente. Se il processo non è riuscito a causa di un errore di memoria insufficiente o se è stato annullato manualmente un processo a causa di un micro batch sovradimensionato, potrebbe essere necessario aumentare le prestazioni del calcolo per elaborare correttamente il micro batch.

Se si modificano le configurazioni tra le esecuzioni, queste configurazioni si applicano al primo nuovo batch pianificato. Consulta Ripristinare dopo le modifiche in una query di Structured Streaming.

Quando viene ritentata un'attività?

È possibile pianificare più attività come parte di un processo di Azure Databricks. Quando si configura un processo usando il trigger continuo, non è possibile impostare dipendenze tra le attività.

È possibile scegliere di pianificare più flussi in un singolo processo usando uno degli approcci seguenti:

  • Attività multiple: Definire un processo con attività multiple che eseguono carichi di lavoro in streaming usando il trigger continuo.
  • Query multiple: definire query di streaming multiple nel codice sorgente per una singola attività.

È anche possibile combinare queste strategie. Nella tabella seguente vengono confrontati questi approcci.

Strategia Attività multiple Più richieste
Come viene condiviso il calcolo? Databricks consiglia di distribuire processi di calcolo con dimensioni appropriate per ogni attività di streaming. Facoltativamente, è possibile condividere il calcolo tra le attività. Tutte le query condividono lo stesso calcolo. Facoltativamente, è possibile assegnare query ai pool di scheduler.
Come vengono gestiti i tentativi? Tutte le attività devono fallire prima che il lavoro venga ripetuto. L'attività si ripete se qualsiasi query fallisce.

Configurare i processi di Structured Streaming per riavviare le query di streaming in caso di errore

Databricks consiglia di configurare tutti i carichi di lavoro di streaming usando il trigger continuo. Vedere Eseguire processi in modo continuo.

Il trigger continuo ha il comportamento seguente per impostazione predefinita:

  • Impedisce più di un'esecuzione simultanea dell'attività.
  • Avvia una nuova esecuzione quando un'esecuzione precedente ha esito negativo.
  • Usa il backoff esponenziale per le ripetizioni.

Databricks consiglia di usare sempre il calcolo processi anziché il calcolo multiuso per la pianificazione dei flussi di lavoro. In caso di errore del processo e di successivo tentativo, vengono distribuite nuove risorse di calcolo.

Nota

Databricks consiglia di non usare streamingQuery.awaitTermination() o spark.streams.awaitAnyTermination(). Vedere Quando usare awaitTermination().

Quando usare awaitTermination()

streamingQuery.awaitTermination() e spark.streams.awaitAnyTermination() bloccano il thread corrente fino a quando una query di streaming non termina. L'uso di queste funzioni dipende dall'ambiente di esecuzione.

Per i processi di Databricks, non usare streamingQuery.awaitTermination() o spark.streams.awaitAnyTermination(). Queste funzioni non sono necessarie perché il servizio Processi impedisce automaticamente il completamento di un'esecuzione quando una query di streaming è attiva. Entrambe le funzioni bloccano il completamento delle celle del notebook e impediscono al servizio Lavori di monitorare la query di streaming, interrompendo così le metriche di arretrato e le notifiche dei lavori.

Usare awaitTermination() nei casi seguenti:

caso d'uso Comportamento
Notebook interattivi nel calcolo a tutti gli scopi awaitTermination() mantiene la cella in esecuzione, consente di osservare lo stato della query e garantisce che gli errori siano presenti nell'output del notebook.
Ambienti di sviluppo e locali Quando si esegue un programma Spark in locale, il processo viene chiuso al termine del thread principale. Chiamare awaitTermination() per mantenere attivo il programma fino al termine o al fallimento della query di streaming.
Propagazione degli errori nel driver Senza awaitTermination(), un errore in una query di streaming in un contesto non di lavoro potrebbe non propagarsi al thread chiamante. La query può avere esito negativo in modo invisibile all'utente, rendendo più difficile rilevare e diagnosticare gli errori. La chiamata awaitTermination() genera nuovamente l'eccezione di query sul driver.

Utilizzare pool di pianificazione per query di streaming multiple

È possibile configurare pool di schedulazione per assegnare capacità di calcolo alle query durante l'esecuzione di più query di streaming dallo stesso codice sorgente.

Per impostazione predefinita, tutte le query avviate in un notebook vengono eseguite nello stesso pool di pianificazione condiviso. I job Apache Spark generati dai trigger di tutte le query di streaming in un notebook sono eseguiti uno dopo l'altro secondo l'ordine FIFO (First In, First Out). Ciò può causare ritardi non necessari nelle query, perché non condividono in modo efficiente le risorse del cluster.

I pool del pianificatore consentono di dichiarare quali query di Structured Streaming condividono le risorse di calcolo.

Nell'esempio seguente viene assegnato a query1 un pool dedicato, mentre query2 e query3 condividono un pool di pianificazione.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Nota

La configurazione della proprietà locale deve trovarsi nella stessa cella del notebook in cui si avvia la query di streaming.

Per ulteriori informazioni sui pool del fair scheduler di Apache, vedere la documentazione del fair scheduler di Apache.