Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Interfaccia usata per scrivere un dataframe di streaming in sistemi di archiviazione esterni, ad esempio file system e archivi chiave-valore. Usare df.writeStream per accedere a questo.
Sintassi
# Access through DataFrame
df.writeStream
Methods
| metodo | Descrizione |
|---|---|
outputMode(outputMode) |
Specifica la modalità di scrittura dei dati di un dataframe di streaming nel sink. Le opzioni sono append, completee update. |
format(source) |
Specifica il formato dell'origine dati di output. |
option(key, value) |
Aggiunge un'opzione di output per l'origine dati sottostante. |
options(**options) |
Aggiunge più opzioni di output per l'origine dati sottostante. |
partitionBy(*cols) |
Partiziona l'output in base alle colonne specificate nel file system. |
clusterBy(*cols) |
Raggruppa l'output in base alle colonne specificate. |
queryName(queryName) |
Specifica il nome della query di streaming. |
trigger(**kwargs) |
Imposta il trigger per l'esecuzione della query di streaming. |
foreach(f) |
Imposta l'output della query di streaming da elaborare dalla funzione o dall'oggetto specificato. |
foreachBatch(func) |
Imposta l'output di ogni microbatch da elaborare dalla funzione specificata. |
start(path) |
Avvia l'esecuzione della query di streaming e restituisce un StreamingQuery oggetto . |
table(tableName) |
Alias per toTable(). Scrive i dati nella tabella specificata e restituisce un StreamingQuery oggetto . |
toTable(tableName) |
Avvia l'esecuzione della query di streaming, che restituisce continuamente i risultati nella tabella specificata. |
Examples
Caricare un flusso di frequenza, applicare una trasformazione, scrivere nella console e arrestarla dopo 3 secondi.
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()