Configurar o modo em tempo real

Esta página descreve os pré-requisitos e a configuração necessários para executar consultas de modo em tempo real no Streaming Estruturado. Para obter um tutorial passo a passo, consulte Tutorial: Executar uma carga de trabalho de streaming em tempo real. Para obter informações conceituais sobre o modo em tempo real, consulte o modo em tempo real no Streaming Estruturado.

Pré-requisitos

Para usar o modo em tempo real, você deve configurar sua computação para atender aos seguintes requisitos:

  • Use a computação clássica. Há suporte para modos de acesso dedicados e padrão. O modo de acesso padrão tem suporte apenas para Python. Não há suporte para pipelines declarativos do Lakeflow Spark e clusters sem servidor.
  • Utilize o Databricks Runtime 16.4 LTS ou versões posteriores.
  • Desativar o dimensionamento automático.
  • Desative o Photon.
  • Defina spark.databricks.streaming.realTimeMode.enabled como true.
  • Desative instâncias spot para evitar interrupções.

Para cargas de trabalho sensíveis à latência com UDFs, o Databricks recomenda que você use o modo de acesso dedicado. Consulte funções de tabela.

Para obter instruções sobre como criar e configurar a computação clássica, consulte a referência de configuração de computação.

Configuração de consulta

Para executar uma consulta no modo em tempo real, você deve habilitar o gatilho em tempo real. Os gatilhos em tempo real têm suporte apenas no modo de atualização.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento de computação

Você pode executar um trabalho em tempo real por recurso de computação se a computação tiver slots de tarefa suficientes.

Para ser executado no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de slots

Tipo de pipeline Configuração Slots necessários
Sem estado de estágio único (origem kafka + coletor) maxPartitions = 8 8 espaços
Estado com dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de embaralhamento = 20 28 slots (8 + 20)
Três estágios (origem kafka + shuffle + repartition) maxPartitions = 8, duas fases de shuffle de 20 cada um 48 slots (8 + 20 + 20)

Se você não definir maxPartitions, use o número de partições no tópico Kafka.

Recursos adicionais