Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Uma tabela de streaming é uma tabela Delta com suporte adicional para streaming ou processamento incremental de dados. Uma tabela de streaming pode ser alvo de um ou mais fluxos numa canalização.
As tabelas de streaming são uma boa opção para a ingestão de dados pelos seguintes motivos:
- Cada linha de entrada é processada apenas uma vez, o que representa a grande maioria das tarefas de ingestão (ou seja, acrescentando ou atualizando linhas numa tabela).
- Eles podem lidar com grandes volumes de dados em modo de acréscimo apenas.
As tabelas de streaming são também uma boa escolha para transformações de streaming de baixa latência, pois conseguem raciocinar sobre linhas e janelas de tempo, lidar com grandes volumes de dados e fornecer processamento de baixa latência.
O diagrama seguinte mostra como os fluxos são lidos a partir de fontes de streaming e escrevem incrementalmente numa tabela de streaming dentro de um pipeline.
Em cada atualização, os fluxos associados a uma tabela de Streaming leem a informação alterada numa fonte de streaming e acrescentam nova informação a essa tabela.
As tabelas de streaming são propriedade de um único pipeline e são atualizadas por ele. Você define explicitamente tabelas de streaming no código-fonte do pipeline. As tabelas definidas por um pipeline não podem ser alteradas ou atualizadas por nenhum outro pipeline. Você pode definir vários fluxos para anexar a uma única tabela de streaming.
O Azure Databricks cria tabelas internas para suportar o processamento de tabelas em streaming. Estas tabelas aparecem em system.information_schema.tables, mas não são visíveis no Explorador de Catálogos ou noutras páginas da interface de utilizador do espaço de trabalho.
Observação
Quando cria uma tabela de streaming fora de um pipeline usando o Databricks SQL, o Azure Databricks cria um pipeline que é usado para atualizar a tabela. Você pode ver o pipeline selecionando Jobs & Pipelines da barra de navegação à esquerda em seu espaço de trabalho. Você pode adicionar a coluna Tipo de pipeline à sua exibição. As tabelas de streaming definidas num pipeline têm um tipo de ETL. As tabelas de streaming criadas no Databricks SQL têm um tipo de MV/ST.
Para obter mais informações sobre fluxos, consulte Carregue e processe dados de forma incremental com fluxos de pipelines declarativos do Lakeflow Spark.
Tabelas de streaming para ingestão
As tabelas de streaming são projetadas para fontes de dados somente para adição e processam dados apenas uma vez. Isto torna-os bem adequados para cargas de trabalho de ingestão, onde os dados chegam continuamente e devem ser capturados de forma fiável sem reprocessar registos existentes. O Azure Databricks suporta a ingestão de dados de armazenamento na cloud e o streaming de barramentos de mensagens.
Ingerir ficheiros a partir de armazenamento na cloud
Podes usar uma tabela de streaming para ingerir novos ficheiros do armazenamento na cloud. Estes exemplos utilizam o Auto Loader para processar progressivamente novos ficheiros à medida que chegam.
Python
from pyspark import pipelines as dp
# Create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
Para criar uma tabela Streaming, a definição do conjunto de dados deve ser um tipo de fluxo. Quando usas a spark.readStream função numa definição de conjunto de dados, ela devolve um conjunto de dados em streaming.
SQL
-- Create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
As tabelas de streaming precisam de conjuntos de dados streaming. A STREAM palavra-chave antes de read_files indica à consulta para que trate o conjunto de dados como um fluxo.
Ingestão de mensagens de streaming
Também pode usar tabelas de streaming para ingerir dados dos barramentos de mensagens. O exemplo seguinte demonstra como criar uma tabela de Streaming que lê a partir de um tópico Pub/Sub.
Python
@dp.table
def pubsub_raw():
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
return (
spark.readStream
.format("pubsub")
.option("subscriptionId", "my-subscription")
.option("topicId", "my-topic")
.option("projectId", "my-project")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'my-subscription',
projectId => 'my-project',
topicId => 'my-topic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. Consulte Configurar acesso ao Pub/Sub para todas as opções de autenticação.
Para obter mais detalhes sobre como inserir dados em uma tabela de streaming, consulte Carregar dados em pipelines.
O diagrama a seguir ilustra como funcionam as tabelas de streaming com acréscimo único.
Uma linha que já foi anexada a uma tabela de Streaming não será consultada novamente com atualizações posteriores ao pipeline. Se modificar a consulta (por exemplo, de SELECT LOWER (name) para SELECT UPPER (name)), as linhas existentes não serão atualizadas para maiúsculas, mas as novas linhas serão maiúsculas. Podes ativar uma atualização completa para requerer todos os dados anteriores da tabela de origem e atualizar todas as linhas da tabela Streaming.
Tabelas de streaming e streaming de baixa latência
As tabelas de streaming são projetadas para streaming de baixa latência em estado limitado. As tabelas de streaming usam o gerenciamento de pontos de verificação, o que as torna adequadas para streaming de baixa latência. No entanto, eles esperam fluxos que são naturalmente delimitados ou delimitados com uma marca d'água.
Um fluxo naturalmente limitado é produzido por uma fonte de dados de streaming que tem um início e um fim bem definidos. Um exemplo de um fluxo naturalmente limitado é a leitura de dados de um diretório de arquivos onde nenhum novo arquivo está sendo adicionado depois que um lote inicial de arquivos é colocado. O fluxo é considerado limitado porque o número de ficheiros é finito, e o fluxo termina depois de todos os ficheiros terem sido processados.
Você também pode usar uma marca d'água para delimitar um fluxo. Uma marca d'água no Streaming Estruturado é um mecanismo que ajuda a lidar com dados atrasados ao especificar quanto tempo o sistema deve esperar por eventos atrasados antes de considerar a janela de tempo como completa. Um fluxo não limitado sem marca d'água pode fazer com que um pipeline falhe devido à pressão da memória.
Para obter mais informações sobre o processamento de fluxo com estado, consulte Otimizar o processamento com estado utilizando marcas d'água.
Junção de stream-snapshot
As junções stream-snapshot ligam um conjunto de dados de streaming a uma tabela de dimensões que é capturada no início da transmissão. Como a tabela de dimensões é tratada como fixa nesse momento, quaisquer alterações feitas após o início do fluxo não são refletidas na junção. Isto é aceitável quando pequenas discrepâncias não têm importância — por exemplo, quando o número de transações é muitas ordens de grandeza superior ao número de clientes.
O exemplo de código seguinte junta uma tabela de dimensões com duas linhas chamadas customers com um conjunto de dados em constante aumento, transactions. Materializa uma junção entre estes dois conjuntos de dados numa tabela chamada sales_report. Se um processo externo atualiza a tabela de clientes adicionando uma nova linha (customer_id=3, name=Zoya), esta nova linha não estará presente na junção porque a tabela de dimensões estática foi snapshotada quando os fluxos foram iniciados.
from pyspark import pipelines as dp
@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")
@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return facts.join(dims, on="customer_id", how="inner")
Limitações da tabela de streaming
As tabelas de streaming têm as seguintes limitações:
-
Evolução limitada: Pode alterar a consulta sem recalcular todo o conjunto de dados. Sem uma atualização completa, uma tabela de Streaming só vê cada linha uma vez, por isso diferentes consultas processarão linhas diferentes. Por exemplo, se adicionares
UPPER()a um campo na consulta, só as linhas processadas após a alteração estarão em maiúsculas. Isto significa que deve estar atento a todas as versões anteriores da consulta que estão a correr no seu conjunto de dados. Para reprocessar linhas existentes que foram processadas antes da alteração, é necessária uma atualização completa. - Gestão de estados: As tabelas de streaming têm baixa latência e requerem fluxos naturalmente delimitados ou delimitados por uma marca de água. Para mais informações, veja Otimizar o processamento com estado usando marcas de água.
- As junções não são recalculadas: As junções em tabelas de streaming não são recalculadas quando as dimensões mudam. Esta característica pode ser boa para cenários "rápidos, mas errados". Se quiseres que a tua visão esteja sempre correta, talvez queiras usar uma vista Materializada. As visualizações materializadas estão sempre corretas porque recalculam automaticamente as junções quando as dimensões mudam. Para obter mais informações, consulte Visões materializadas.