Inscrever-se no Google Pub/Sub

Use o conector incorporado para subscrever o Google Pub/Sub. Este conector fornece processamento com semântica de exatamente uma vez para dados do assinante.

Nota

Pub/Sub pode publicar registos duplicados, ou registos podem chegar ao assinante fora de ordem. Escreve código para lidar com registos duplicados e fora de ordem.

Configurar um stream Pub/Sub

O exemplo de código seguinte demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado a partir de Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções de leitura de streaming do Pub/Sub.

Configurar o acesso a Pub/Sub

As credenciais que configura devem ter as seguintes funções.

Funções Obrigatório ou opcional Como o papel é utilizado
roles/pubsub.viewer ou roles/viewer Necessário Verifica se existe subscrição e recebe subscrição.
roles/pubsub.subscriber Necessário Recolhe dados de uma subscrição.
roles/pubsub.editor ou roles/editor Opcional Permite a criação de uma subscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para eliminar subscrições na terminação do fluxo.

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Compreenda o esquema Pub/Sub

O esquema do fluxo corresponde aos registos que são obtidos do Pub/Sub, conforme descrito na tabela seguinte.

Campo Tipo
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming Pub/Sub

A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Structured Streaming usando a sintaxe .option("<optionName>", "<optionValue>").

Nota

Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes internos da implementação, e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e, em seguida, processados.

Opção Valor predefinido Descrição
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que obtêm registos de uma subscrição.
deleteSubscriptionOnStreamStop false Se true, a assinatura passada para o fluxo é excluída quando o trabalho de streaming termina.
maxBytesPerTrigger none Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado.
maxRecordsPerFetch 1000 O número de registros a serem buscados por tarefa antes de processar registros.
maxFetchPeriod 10s A duração de cada tarefa para buscar antes de processar os registos. Aceita uma sequência de duração, por exemplo, 1s durante 1 segundo ou 1m 1 minuto. O Databricks recomenda o uso do valor padrão.

Use processamento incremental em lotes com Pub/Sub

Podes usar Trigger.AvailableNow para consumir registos disponíveis das fontes Pub/Sub como um lote incremental.

O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. Os registos processados pelo lote de processamento incluem todos os dados obtidos anteriormente e quaisquer registos recém-publicados com um timestamp inferior ao timestamp de início do fluxo registado. Para mais informações, vejaAvailableNow: Processamento incremental em lote.

Monitorizar as métricas de streaming do Pub/Sub

As métricas de progresso do Streaming estruturado relatam o número de registros buscados e prontos para processar, o tamanho dos registros buscados e prontos para processar e o número de duplicatas vistas desde o início do fluxo. Segue-se um exemplo destas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Pub/Sub não suporta execução especulativa (spark.speculation).