Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Você pode carregar dados de qualquer fonte de dados suportada pelo Apache Spark no Azure Databricks usando pipelines. Pode definir conjuntos de dados — tabelas e vistas — nos Lakeflow Spark Declarative Pipelines em qualquer consulta que devolva um DataFrame Spark, incluindo DataFrames em streaming e Pandas para DataFrames Spark. Para tarefas de ingestão de dados, o Databricks recomenda o uso de tabelas de streaming para a maioria dos casos de uso. As tabelas de streaming são úteis para ingerir dados de armazenamento de objetos na nuvem usando o Auto Loader ou de barramentos de mensagens como o Kafka.
Nem todas as fontes de dados têm suporte SQL para ingestão. No entanto, podes misturar fontes SQL e Python no mesmo pipeline para usar Python onde for necessário. Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas no Lakeflow Spark Declarative Pipelines por padrão, consulte Manage Python dependencies for pipelines. Para obter informações gerais sobre ingestão no Azure Databricks, consulte Conectores padrão no Lakeflow Connect.
Os exemplos seguintes demonstram alguns padrões comuns de carregamento de dados.
Carregar a partir de uma tabela existente
Carregue dados de qualquer tabela existente no Azure Databricks. Pode transformar os dados usando uma consulta ou carregar a tabela para processamento adicional no seu pipeline.
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Carregar arquivos do armazenamento de objetos na nuvem
O Databricks recomenda o uso do Auto Loader em pipelines para a maioria das tarefas de ingestão de dados do armazenamento de objetos na nuvem ou de arquivos em um volume do Catálogo Unity. O Auto Loader e os pipelines são projetados para carregar dados cada vez maiores de forma incremental e idimpotente à medida que chegam ao armazenamento em nuvem. Consulte O que é Auto Loader? e Carregar dados do armazenamento de objetos.
O exemplo seguinte lê dados do armazenamento na nuvem usando o Auto Loader.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Os exemplos seguintes usam o Auto Loader para criar conjuntos de dados a partir de ficheiros CSV num volume do Unity Catalog.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Observação
- Se utilizar o Auto Loader com notificações de arquivo e realizar uma atualização completa do seu pipeline ou da sua tabela de streaming, deverá limpar manualmente os seus recursos. Você pode usar o CloudFilesResourceManager num notebook para executar a limpeza.
- Para carregar arquivos com o Auto Loader num pipeline com Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar o Unity Catalog com pipelines, consulte Usar o Unity Catalog com pipelines.
Autenticar para armazenamento na nuvem
O Auto Loader utiliza localizações externas do Unity Catalog para autenticar-se em armazenamento na nuvem. Deve configurar uma localização externa para o caminho de armazenamento a partir do qual deseja ler e conceder esse READ FILES privilégio ao utilizador executante.
Para ingerir a partir do Azure Data Lake Storage, configure uma localização externa apoiada por uma credencial de armazenamento que faça referência a um contentor de armazenamento. Para mais informações, consulte Ligar ao armazenamento de objetos na cloud usando o Unity Catalog.
Carregar dados a partir de um barramento de mensagens
Você pode configurar pipelines para incorporar dados de buses de mensagens. A Databricks recomenda o uso de tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência a partir de barramentos de mensagens. Para mais informações, consulte Otimizar a utilização do cluster dos Lakeflow Spark Declarative Pipelines com Autoscaling.
Por exemplo, o código seguinte configura uma tabela de streaming para ingerir dados de Kafka usando a função read_kafka .
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Para ingerir a partir de outras fontes de barramento de mensagens, consulte:
- Kinesis: read_kinesis
- Pub/Sub tópico: read_pubsub
- Pulsar: read_pulsar
Carregar dados de Hubs de Eventos do Azure
Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com Apache Kafka. Você pode usar o conector Kafka de Streaming Estruturado, incluído no runtime do Lakeflow Spark Declarative Pipelines, para carregar mensagens a partir dos Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, consulte Usar Hubs de Eventos do Azure como uma fonte de dados de pipeline.
Carregar dados de sistemas externos
O Lakeflow Spark Declarative Pipelines dá suporte ao carregamento de dados de qualquer fonte de dados suportada pelo Azure Databricks. Consulte Conectar-se a fontes de dados e serviços externos. Você também pode carregar dados externos usando Lakehouse Federation para fontes de dados suportadas. Como a Lakehouse Federation exige Databricks Runtime 13.3 LTS ou superior, para usar a Lakehouse Federation, configure o seu pipeline para usar o canal de pré-visualização.
Algumas fontes de dados não têm suporte equivalente a SQL. Se você não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar Python para ingerir dados da fonte. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo seguinte declara uma vista materializada para aceder ao estado atual dos dados numa tabela remota PostgreSQL.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Carregar conjuntos de dados pequenos ou estáticos a partir do armazenamento de objetos na cloud
Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Lakeflow Spark Declarative Pipelines suporta todos os formatos de arquivo suportados pelo Apache Spark no Azure Databricks. Para obter uma lista completa, consulte Opções de formato de dados.
Os exemplos seguintes demonstram o carregamento de JSON para criar uma tabela.
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Observação
A função read_files SQL é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL em pipelines. Para obter mais informações, consulte as opções e.
Carregar dados de uma fonte de dados personalizada do Python
As fontes de dados personalizadas do Python permitem carregar dados em formatos personalizados. Podes escrever código para ler e escrever para uma fonte de dados externa específica ou usar o teu código Python existente para ler dados dos teus próprios sistemas internos. Para obter mais detalhes sobre o desenvolvimento de fontes de dados Python, consulte Fontes de dados personalizadas do PySpark.
O exemplo seguinte regista uma fonte de dados personalizada com o nome de formato my_custom_datasource e lê a partir dela tanto em modo em lote como em modo de fluxo contínuo.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem
Por padrão, as tabelas de streaming exigem fontes somente de adição. Se a sua tabela de streaming de origem exigir atualizações ou eliminações — por exemplo, para o processamento do "direito ao esquecimento" do RGPD — use a skipChangeCommits bandeira para ignorar essas alterações. Esta flag funciona apenas com spark.readStream ao usar a função option() e não pode ser usada quando a tabela de streaming de origem for o alvo da função create_auto_cdc_flow(). Para mais informações, consulte Gerir alterações nas tabelas Delta de origem.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Acesse, com segurança, credenciais de armazenamento usando segredos num pipeline
Você pode usar o Azure Databricks segredos para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo no seu pipeline, use uma propriedade Spark nas definições de configuração do cluster do pipeline. Consulte Configurar computação clássica para pipelines.
O exemplo seguinte utiliza um segredo para armazenar uma chave de acesso necessária para ler dados de entrada de uma conta de armazenamento do Azure Data Lake Storage usando o Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, chaves da AWS para acessar o S3 ou a senha para um metastore do Apache Hive.
Para saber mais sobre como trabalhar com o Armazenamento do Azure Data Lake, veja Conecte-se ao Armazenamento do Azure Data Lake e ao Armazenamento de Blob.
Observação
Você deve adicionar o prefixo spark.hadoop. à chave de configuração spark_conf que define o valor secreto.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Neste exemplo de código, substitua os seguintes valores.
| Marcador de Posição | Substitua por |
|---|---|
<container-name> |
O nome do contentor da conta de armazenamento Azure. |
<storage-account-name> |
O nome da conta de armazenamento do ADLS. |
<path> |
O caminho para os metadados e dados de saída do pipeline. |
<scope-name> |
O nome do scope secreto do Azure Databricks. |
<secret-name> |
O nome da chave que contém a chave de acesso à conta de armazenamento do Azure. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Neste exemplo de código, substitua os seguintes valores.
| Marcador de Posição | Substitua por |
|---|---|
<container-name> |
O nome do contentor da conta de armazenamento Azure que armazena os dados de entrada. |
<storage-account-name> |
O nome da conta de armazenamento do ADLS. |
<path-to-input-dataset> |
O caminho para o conjunto de dados de entrada. |