Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página contém recomendações para agendar cargas de trabalho de Structured Streaming usando jobs no Azure Databricks.
O Databricks recomenda que configure sempre o seguinte:
- Remova o código desnecessário dos blocos de anotações que retornariam resultados, como
displayecount. - Não execute cargas de trabalho de Streaming Estruturado usando computação multiuso. Sempre agende fluxos como trabalhos usando a computação de tarefas.
- Agendar trabalhos usando o modo
Continuous. Isto refere-se à funcionalidade de agendamento de tarefas do Azure Databricks, não ao intervalo de disparo do Structured Streaming. - Não ative o autoscaling para computação em trabalhos de Structured Streaming.
Algumas cargas de trabalho se beneficiam do seguinte:
- Configurar o armazenamento de estado do RocksDB no Azure Databricks
- Ponto de verificação de estado assíncrono para consultas com estado
- O que é o acompanhamento assíncrono do progresso?
A Azure Databricks introduziu os Pipelines Declarativos Lakeflow Spark para reduzir as complexidades da gestão da infraestrutura de produção para cargas de trabalho de Streaming Estruturado. A Databricks recomenda a utilização de Lakeflow Spark Declarative Pipelines para novos Structured Streaming pipelines. Veja Lakeflow Spark Declarative Pipelines.
Nota
O dimensionamento automático de computação tem limitações ao reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. A Databricks recomenda o uso do Lakeflow Spark Declarative Pipelines com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Otimize a utilização do cluster de pipelines declarativos do Lakeflow Spark com dimensionamento automático.
:::note Computação sem servidor
Em computação serverless, apenas Trigger.AvailableNow() e Trigger.Once() são suportados. Databricks recomenda Trigger.AvailableNow().
Para streaming contínuo em computação serverless, use o modo Triggered ou o modo pipeline contínuo.
Consulte Limitações de streaming.
:::
Projete cargas de trabalho de streaming para esperar falhas
O Databricks recomenda sempre configurar trabalhos de streaming para reiniciar automaticamente em caso de falha. Algumas capacidades, incluindo a evolução de esquemas, exigem que as cargas de trabalho de Streaming Estruturado sejam configuradas para tentar automaticamente novamente. Consulte Configurar trabalhos de streaming estruturado para reiniciar consultas de streaming quando ocorrer uma falha.
Algumas operações, como foreachBatch, fornecem garantias de pelo menos uma vez em vez de exatamente uma vez. Para estas operações, certifique-se de que o seu pipeline de processamento é idempotente. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.
Nota
Quando uma consulta é reiniciada, o microlote planeado durante a execução anterior é processado. Se o seu trabalho falhou devido a um erro de falta de memória ou se você cancelou manualmente um trabalho devido a um microlote superdimensionado, talvez seja necessário aumentar a computação para processar com êxito o microlote.
Se você alterar as configurações entre execuções, essas configurações se aplicarão ao primeiro novo lote planejado. Consulte Recuperar após alterações numa consulta de Streaming Estruturado.
Quando é que uma tarefa recomeça?
Pode agendar múltiplas tarefas como parte de um trabalho no Azure Databricks. Quando você configura um trabalho usando o gatilho contínuo, não pode definir dependências entre tarefas.
Você pode optar por agendar vários fluxos em um único trabalho usando uma das seguintes abordagens:
- Várias tarefas: definir um trabalho com várias tarefas que executam trabalhos de streaming usando o gatilho contínuo.
- Várias consultas: defina várias consultas de streaming no código-fonte para uma única tarefa.
Você também pode combinar essas estratégias. A tabela a seguir compara essas abordagens.
| Estratégia | Múltiplas tarefas | Várias consultas |
|---|---|---|
| Como a computação é compartilhada? | O Databricks recomenda a implementação de recursos computacionais de tamanho adequado para cada tarefa de streaming. Opcionalmente, você pode compartilhar computação entre tarefas. | Todas as consultas compartilham o mesmo cálculo. Pode, opcionalmente, atribuir consultas a pools de agendadores. |
| Como são tratadas as novas tentativas? | Todas as tarefas devem falhar antes que o trabalho seja tentado novamente. | A tarefa é retomada se alguma consulta falhar. |
Configurar trabalhos de Streaming Estruturado para reiniciar consultas de streaming em caso de falha
O Databricks recomenda configurar todas as cargas de trabalho de streaming usando o gatilho contínuo. Consulte Executar tarefas continuamente.
O gatilho contínuo apresenta o seguinte comportamento por defeito:
- Impede mais de uma execução simultânea do trabalho.
- Inicia uma nova execução quando uma execução anterior falha.
- Usa backoff exponencial para novas tentativas.
Databricks recomenda sempre o uso de computação específica para tarefas em vez de computação geral ao agendar fluxos de trabalho. Em caso de falha e repetição do trabalho, novos recursos de computação são implantados.
Nota
O Databricks recomenda que não use streamingQuery.awaitTermination() nem spark.streams.awaitAnyTermination(). Veja Quando usar awaitTermination().
Quando usar awaitTermination()
streamingQuery.awaitTermination() e spark.streams.awaitAnyTermination() bloqueiam a thread atual até que uma consulta de streaming termine. Se deve usar estas funções depende do seu ambiente de execução.
Para as tarefas do Databricks, não use streamingQuery.awaitTermination() nem spark.streams.awaitAnyTermination(). Estas funções não são necessárias porque o serviço Jobs impede automaticamente que uma execução seja concluída quando uma consulta de streaming está ativa. Ambas as funções bloqueiam a conclusão das células do caderno e impedem o serviço Jobs de rastrear a consulta em streaming, o que perturba métricas de backlog e notificações de trabalho.
Utilize o awaitTermination() nos seguintes casos:
| Caso de utilização | Comportamento |
|---|---|
| Cadernos interativos em computação de uso geral |
awaitTermination() mantém a célula a funcionar, permite-lhe observar o estado da consulta e garante que as falhas apareçam na saída do notebook. |
| Ambientes locais e de desenvolvimento | Ao executar um programa Spark localmente, o processo encerra quando o thread principal termina. Chame awaitTermination() para manter o programa ativo até que a consulta de streaming termine ou falhe. |
| Propagação de falhas para o controlador | Sem awaitTermination(), uma falha de consulta de streaming num contexto que não seja trabalho pode não se propagar para o thread que chama. A consulta pode falhar silenciosamente, tornando as falhas mais difíceis de detetar e diagnosticar. Chamar awaitTermination() relança a exceção de consulta no driver. |
Use conjuntos de agendamento para várias consultas de streaming
Pode configurar pools de escalonadores para atribuir capacidade de processamento a consultas ao executar múltiplas consultas de streaming a partir do mesmo código-fonte.
Por padrão, todas as consultas iniciadas num notebook são executadas no mesmo pool de agendamento justo. Os trabalhos do Apache Spark gerados por gatilhos de todas as consultas de streaming em um bloco de notas são executados um após o outro na ordem "primeiro a entrar, primeiro a sair" (FIFO). Isso pode causar atrasos desnecessários nas consultas, porque elas não estão compartilhando eficientemente os recursos do cluster.
Os pools do Agendador permitem que você declare quais consultas de Streaming Estruturado compartilham recursos de computação.
O exemplo seguinte atribui query1 a um pool dedicado, enquanto query2 e query3 partilham um pool de agendadores.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Nota
A configuração da propriedade local deve estar na mesma célula do bloco de anotações onde se inicia a consulta de streaming.
Para mais informações sobre pools de agendadores de feiras Apache, consulte documentação sobre agendadores de feiras Apache.