Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Interface usada para carregar um DataFrame em streaming a partir de sistemas de armazenamento externos (por exemplo, sistemas de ficheiros e armazenamentos-chave-valor). Uso spark.readStream para aceder a isto.
Sintaxe
# Access through SparkSession
spark.readStream
Methods
| Método | Descrição |
|---|---|
format(source) |
Especifica o formato da fonte de dados de entrada. |
schema(schema) |
Especifica o esquema do DataFrame em streaming. |
option(key, value) |
Adiciona uma opção de entrada para a fonte de dados subjacente. |
options(**options) |
Adiciona múltiplas opções de entrada para a fonte de dados subjacente. |
load(path) |
Carrega o DataFrame em streaming a partir do caminho dado e devolve-o. |
json(path) |
Carrega um fluxo de ficheiros JSON e devolve um DataFrame. |
orc(path) |
Carrega um fluxo de ficheiros ORC e devolve um DataFrame. |
parquet(path) |
Carrega um fluxo de ficheiros Parquet e retorna um DataFrame. |
text(path) |
Carrega um fluxo de ficheiro de texto e devolve um DataFrame. |
csv(path) |
Carrega um fluxo de ficheiros CSV e retorna um DataFrame. |
xml(path) |
Carrega um fluxo de ficheiros XML e devolve um DataFrame. |
table(tableName) |
Carrega uma tabela Delta em streaming e devolve um DataFrame. |
name(source_name) |
Atribui um nome à fonte de streaming para a evolução dos pontos de controlo. |
changes(tableName) |
Devolve alterações ao nível da linha (Captura de Dados de Alteração) da tabela especificada como DataFrame em streaming. |
Exemplos
spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>
Carrega um fluxo de velocidade, aplica uma transformação, escreve na consola e pára após 3 segundos.
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()