Carregar dados em pipelines

Você pode carregar dados de qualquer fonte de dados com suporte do Apache Spark no Azure Databricks usando pipelines. Você pode definir conjuntos de dados — tabelas e exibições — em Pipelines Declarativos do Spark do Lakeflow em relação a qualquer consulta que retorne um DataFrame do Spark, incluindo DataFrames de streaming e Pandas para DataFrames do Spark. Para tarefas de ingestão de dados, o Databricks recomenda usar tabelas de streaming para a maioria dos casos de uso. As tabelas de streaming são úteis para ingerir dados do armazenamento de objetos na nuvem usando o Carregador Automático ou de barramentos de mensagens como o Kafka.

Nem todas as fontes de dados têm suporte sql para ingestão. No entanto, você pode misturar fontes de SQL e Python no mesmo pipeline para usar o Python conforme necessário. Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas no Lakeflow Spark Declarative Pipelines por padrão, consulte Gerenciar dependências do Python para pipelines. Para obter informações gerais sobre a ingestão no Azure Databricks, consulte Conectores Padrão no Lakeflow Connect.

Os exemplos a seguir demonstram alguns padrões comuns de carregamento de dados.

Carregar de uma tabela existente

Carregue dados de qualquer tabela existente no Azure Databricks. Você pode transformar os dados usando uma consulta ou carregar a tabela para processamento adicional em seu pipeline.

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Carregar arquivos do armazenamento de objetos da nuvem

O Databricks recomenda o uso do Carregador Automático em pipelines para a maioria das tarefas de ingestão de dados do armazenamento de objetos de nuvem ou de arquivos em um volume do Catálogo do Unity. O Carregador Automático e os pipelines são projetados para carregar dados cada vez maiores de forma incremental e idempotente à medida que chegam ao armazenamento em nuvem. Veja o que é o Carregador Automático? e carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento em nuvem usando o Carregador Automático.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Os exemplos a seguir usam o Carregador Automático para criar conjuntos de dados de arquivos CSV em um volume do Catálogo do Unity.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Observação

  • Se você usar o Carregador Automático com notificações de arquivo e executar uma atualização completa no seu pipeline ou tabela de streaming, deverá limpar manualmente os recursos. Você pode usar o CloudFilesResourceManager em um notebook para executar a limpeza.
  • Para carregar arquivos com o Carregador Automático em um pipeline habilitado para o Catálogo do Unity, você deve usar localizações externas. Para saber mais sobre como usar o Catálogo do Unity com pipelines, consulte Usar o Catálogo do Unity com pipelines.

Autenticar no armazenamento em nuvem

O Auto Loader utiliza locais externos do Unity Catalog para autenticar no armazenamento em nuvem. Você deve configurar um local externo para o caminho de armazenamento do qual deseja ler e conceder o READ FILES privilégio ao usuário executor.

Para ingerir de Azure Data Lake Storage, configure um local externo apoiado por uma credencial de armazenamento que faça referência a um contêiner de armazenamento. Para obter mais informações, consulte Conectar-se ao armazenamento de objetos de nuvem usando o Catálogo do Unity.

Carregar dados de um barramento de mensagens

Você pode configurar pipelines para ingerir dados de barramentos de mensagens. O Databricks recomenda o uso de tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para o carregamento de baixa latência dos barramentos de mensagens. Para obter mais informações, consulte Otimizar a utilização de cluster de Pipelines Declarativos do Lakeflow Spark com dimensionamento automático.

Por exemplo, o código a seguir configura uma tabela de streaming para ingerir dados do Kafka usando a função read_kafka .

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Para ingerir de outras fontes de barramento de mensagens, confira:

Carrege dados de Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com o Apache Kafka. Você pode usar o Conector Kafka de Streaming Estruturado, incluído no ambiente de execução do Lakeflow Spark Declarative Pipelines, para carregar mensagens do serviço de Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, consulte Usar os Hubs de Eventos do Azure como uma fonte de dados de pipeline.

Carregar dados de sistemas externos

O Lakeflow Spark Declarative Pipelines dá suporte ao carregamento de dados de qualquer fonte de dados com suporte do Azure Databricks. Consulte Conectar-se a fontes de dados e serviços externos. Você também pode carregar dados externos usando Lakehouse Federation para fontes de dados com suporte. Como a Federação do Lakehouse requer o Databricks Runtime 13.3 LTS ou superior, para usar a Federação do Lakehouse, configure o pipeline para usar o canal de visualização.

Algumas fontes de dados não têm suporte sql equivalente. Se você não puder usar a Federação Lakehouse com uma dessas fontes de dados, poderá usar o Python para importar dados da origem. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma exibição materializada para acessar o estado atual dos dados em uma tabela postgreSQL remota.

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos na nuvem

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Lakeflow Spark Declarative Pipelines dá suporte a todos os formatos de arquivo compatíveis com o Apache Spark no Azure Databricks. Para obter uma lista completa, consulte as opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar uma tabela.

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Observação

A função SQL read_files é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL em pipelines. Para obter mais informações, consulte Opções.

Carregar dados de uma fonte de dados personalizada do Python

As fontes de dados personalizadas do Python permitem carregar dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica ou usar seu código de Python existente para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre como desenvolver fontes de dados do Python, consulte fontes de dados personalizadas do PySpark.

O exemplo a seguir registra uma fonte de dados personalizada com o formato de nome my_custom_datasource e lê dela nos modos batch e streaming.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Por padrão, as tabelas de streaming exigem fontes somente de acréscimo. Se a tabela de origem do streaming exigir atualizações ou exclusões, por exemplo, para o processamento do RGPD "Direito de Ser Esquecido", use o skipChangeCommits sinalizador para ignorar essas alterações. Esse flag funciona apenas usando a option() função e não pode ser usado quando a tabela de streaming de origem se destina à função create_auto_cdc_flow(). Para obter mais informações, consulte Manipular alterações nas tabelas Delta de origem.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Acessar as credenciais de armazenamento com segurança com os segredos em um pipeline

Você pode usar segredos do Azure Databricks para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo nop seu pipeline, use uma propriedade Spark na configuração do cluster de configurações do pipeline. Consulte Configuração clássica de computação para pipelines.

O exemplo a seguir usa um segredo para armazenar uma chave de acesso necessária para ler dados de entrada de uma conta de armazenamento Azure Data Lake Storage usando o Carregador Automático. Você pode usar esse mesmo método para configurar um segredo exigido pelo pipeline, por exemplo, chaves AWS para acessar o S3 ou a senha para um metastore do Hive do Apache.

Para saber mais sobre como trabalhar com o Azure Data Lake Storage, consulte Connect to Azure Data Lake Storage and Armazenamento de Blobs.

Observação

Você deve adicionar o prefixo spark.hadoop. à chave de configuração spark_conf que define o valor secreto.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Neste exemplo de código, substitua os valores a seguir.

Placeholder Substituir por
<container-name> O nome do contêiner da conta de armazenamento Azure.
<storage-account-name> O nome da conta de armazenamento do ADLS.
<path> O caminho para dados de saída de pipeline e metadados.
<scope-name> O nome do escopo do segredo do Azure Databricks.
<secret-name> O nome da chave que contém a chave de acesso da conta de armazenamento Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Neste exemplo de código, substitua os valores a seguir.

Placeholder Substituir por
<container-name> O nome do contêiner da conta de armazenamento Azure que armazena os dados de entrada.
<storage-account-name> O nome da conta de armazenamento do ADLS.
<path-to-input-dataset> O caminho para o conjunto de dados de entrada.