Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Use o conector incorporado para subscrever o Google Pub/Sub. Este conector fornece processamento com semântica de exatamente uma vez para dados do assinante.
Nota
Pub/Sub pode publicar registos duplicados, ou registos podem chegar ao assinante fora de ordem. Escreve código para lidar com registos duplicados e fora de ordem.
Configurar um stream Pub/Sub
O exemplo de código seguinte demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado a partir de Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções de leitura de streaming do Pub/Sub.
Configurar o acesso a Pub/Sub
As credenciais que configura devem ter as seguintes funções.
| Funções | Obrigatório ou opcional | Como o papel é utilizado |
|---|---|---|
roles/pubsub.viewer ou roles/viewer |
Necessário | Verifica se existe subscrição e recebe subscrição. |
roles/pubsub.subscriber |
Necessário | Recolhe dados de uma subscrição. |
roles/pubsub.editor ou roles/editor |
Opcional | Permite a criação de uma subscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para eliminar subscrições na terminação do fluxo. |
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmailclientIdprivateKeyprivateKeyId
Compreenda o esquema Pub/Sub
O esquema do fluxo corresponde aos registos que são obtidos do Pub/Sub, conforme descrito na tabela seguinte.
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming Pub/Sub
A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Structured Streaming usando a sintaxe .option("<optionName>", "<optionValue>").
Nota
Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes internos da implementação, e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e, em seguida, processados.
| Opção | Valor predefinido | Descrição |
|---|---|---|
numFetchPartitions |
Defina como metade do número de executores presentes na inicialização do fluxo. | O número de tarefas paralelas do Spark que obtêm registos de uma subscrição. |
deleteSubscriptionOnStreamStop |
false |
Se true, a assinatura passada para o fluxo é excluída quando o trabalho de streaming termina. |
maxBytesPerTrigger |
none |
Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado. |
maxRecordsPerFetch |
1000 |
O número de registros a serem buscados por tarefa antes de processar registros. |
maxFetchPeriod |
10s |
A duração de cada tarefa para buscar antes de processar os registos. Aceita uma sequência de duração, por exemplo, 1s durante 1 segundo ou 1m 1 minuto. O Databricks recomenda o uso do valor padrão. |
Use processamento incremental em lotes com Pub/Sub
Podes usar Trigger.AvailableNow para consumir registos disponíveis das fontes Pub/Sub como um lote incremental.
O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. Os registos processados pelo lote de processamento incluem todos os dados obtidos anteriormente e quaisquer registos recém-publicados com um timestamp inferior ao timestamp de início do fluxo registado. Para mais informações, vejaAvailableNow: Processamento incremental em lote.
Monitorizar as métricas de streaming do Pub/Sub
As métricas de progresso do Streaming estruturado relatam o número de registros buscados e prontos para processar, o tamanho dos registros buscados e prontos para processar e o número de duplicatas vistas desde o início do fluxo. Segue-se um exemplo destas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Pub/Sub não suporta execução especulativa (spark.speculation).