Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
La modalità in tempo reale abilita lo streaming a bassa latenza con una latenza end-to-end inferiore a cinque millisecondi, rendendola ideale per carichi di lavoro operativi come il rilevamento delle frodi e la personalizzazione in tempo reale. Questa esercitazione illustra come configurare la prima query di streaming in tempo reale usando un semplice esempio.
Per informazioni concettuali sulla modalità in tempo reale, quando usarla e sulle funzionalità supportate, vedere Modalità in tempo reale in Structured Streaming. Per i requisiti di configurazione, vedere Configurare la modalità in tempo reale.
Requisiti
Prima di iniziare, assicurarsi di avere le autorizzazioni per creare un cluster di calcolo classico che usa la configurazione specificata in Configurare la modalità in tempo reale. In alternativa, contattare l'amministratore dell'area di lavoro per creare automaticamente un cluster in modalità in tempo reale.
Passaggio 1: Creare un notebook
I notebook offrono un ambiente interattivo per lo sviluppo e il test di query di streaming. Si utilizza questo notebook per scrivere query in tempo reale e visualizzare i risultati che si aggiornano continuamente.
Per creare un notebook:
- Fare clic su Nuovo nella barra laterale, quindi fare clic
Notebook.
- Nel menu a discesa del calcolo, selezionare il cluster in modalità tempo reale.
- Selezionare Python o Scala come lingua predefinita.
Passaggio 2: Eseguire una query in modalità in tempo reale
Copiare e incollare il codice seguente in una cella del notebook ed eseguirlo. In questo esempio viene utilizzata un'origine di frequenza, che genera righe a una velocità specificata e visualizza i risultati in tempo reale.
Annotazioni
La funzione display che utilizza il trigger realTime è disponibile in Databricks Runtime 17.1 e versioni successive.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
Dopo aver eseguito il codice, viene visualizzata una tabella che viene aggiornata in tempo reale man mano che vengono generate nuove righe. Nella tabella vengono visualizzate una timestamp colonna e una value colonna che viene incrementata con ogni riga.
Informazioni sul codice
Il codice precedente illustra i componenti essenziali di una query di streaming in tempo reale. Le tabelle seguenti illustrano i parametri chiave e il relativo controllo:
Python
| Parametro | Descrizione |
|---|---|
format("rate") |
Usa l'origine della frequenza, un'origine predefinita che genera righe a una velocità configurabile. Ciò è utile per il test senza dipendenze esterne. |
numPartitions |
Imposta il numero di partizioni per i dati generati. |
rowsPerSecond |
Controlla il numero di righe generate al secondo. |
realTime="5 minutes" |
Abilita la modalità in tempo reale. L'intervallo specifica la frequenza di avanzamento dei checkpoint della query. Intervalli più lunghi significano meno frequenti checkpoint, ma potenzialmente più lunghi tempi di ripristino dopo gli errori. |
outputMode="update" |
La modalità in tempo reale richiede la modalità di aggiornamento dell'output. |
Scala
| Parametro | Descrizione |
|---|---|
format("rate") |
Usa l'origine della frequenza, un'origine predefinita che genera righe a una velocità configurabile. Ciò è utile per il test senza dipendenze esterne. |
numPartitions |
Imposta il numero di partizioni per i dati generati. |
rowsPerSecond |
Controlla il numero di righe generate al secondo. |
Trigger.RealTime() |
Abilita la modalità in tempo reale con l'intervallo di checkpoint predefinito. È anche possibile specificare un intervallo, ad esempio Trigger.RealTime("5 minutes"). |
OutputMode.Update() |
La modalità in tempo reale richiede la modalità di aggiornamento dell'output. |
Passaggio 3: Convalidare i risultati
Quando si esegue la query, la display funzione crea una tabella che viene aggiornata in tempo reale quando l'origine della frequenza genera nuove righe. Ogni riga contiene:
- Timestamp del momento in cui la riga è stata generata dall'origine del tasso.
- Contatore che aumenta in modo monotonico con ogni nuova riga.
La tabella viene aggiornata continuamente con una latenza minima, dimostrando come la modalità in tempo reale elabora i dati non appena diventa disponibile. Questo è il vantaggio principale della modalità in tempo reale: la possibilità di visualizzare e agire sui dati immediatamente anziché attendere l'elaborazione batch.
Risorse aggiuntive
Ora che è stata eseguita la prima query in tempo reale, è possibile esplorare queste risorse per creare applicazioni di streaming in produzione con Kafka, Kinesis e altre origini supportate.