Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Interfaccia usata per caricare un dataframe di streaming da sistemi di archiviazione esterni , ad esempio file system e archivi chiave-valore. Usare spark.readStream per accedere a questo.
Sintassi
# Access through SparkSession
spark.readStream
Methods
| metodo | Descrizione |
|---|---|
format(source) |
Specifica il formato dell'origine dati di input. |
schema(schema) |
Specifica lo schema del dataframe di streaming. |
option(key, value) |
Aggiunge un'opzione di input per l'origine dati sottostante. |
options(**options) |
Aggiunge più opzioni di input per l'origine dati sottostante. |
load(path) |
Carica il dataframe di streaming dal percorso specificato e lo restituisce. |
json(path) |
Carica un flusso di file JSON e restituisce un dataframe. |
orc(path) |
Carica un flusso di file ORC e restituisce un dataframe. |
parquet(path) |
Carica un flusso di file Parquet e restituisce un dataframe. |
text(path) |
Carica un flusso di file di testo e restituisce un dataframe. |
csv(path) |
Carica un flusso di file CSV e restituisce un dataframe. |
xml(path) |
Carica un flusso di file XML e restituisce un dataframe. |
table(tableName) |
Carica una tabella Delta di streaming e restituisce un dataframe. |
name(source_name) |
Assegna un nome all'origine di streaming per l'evoluzione del checkpoint. |
changes(tableName) |
Restituisce le modifiche a livello di riga (Change Data Capture) dalla tabella specificata come dataframe di streaming. |
Examples
spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>
Caricare un flusso di frequenza, applicare una trasformazione, scrivere nella console e arrestarla dopo 3 secondi.
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()