Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Domande frequenti sull'uso di Kafka con Azure Databricks.
Perché viene visualizzato un errore che indica che un'opzione Kafka non è supportata o non è riconosciuta?
Un errore comune è dimenticare il kafka. prefisso quando si impostano le opzioni di configurazione native kafka. Tutte le opzioni passate direttamente al client Kafka devono essere precedute da kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Le opzioni specifiche del connettore Spark Kafka (ad esempio subscribe, startingOffsets, maxOffsetsPerTrigger) non richiedono il prefisso . Vedere Opzioni per l'elenco completo.
Perché viene visualizzato un errore relativo alle classi Kafka ombreggiate?
Azure Databricks richiede l'uso di classi Kafka ombreggiate (precedute da kafkashaded. o shadedmskiam.). Se vengono visualizzati errori come RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, è necessario usare i nomi delle classi ombreggiate:
-
org.apache.kafka.*le classi richiedono ilkafkashaded.prefisso . Ad esempio:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*le classi richiedono ilshadedmskiam.prefisso . Ad esempio:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Perché ricevo un TimeoutException quando mi collego a Kafka?
Cause comuni includono:
- Connettività di rete: il cluster di calcolo non può raggiungere i broker Kafka. Controllare le regole del firewall, i gruppi di sicurezza e le configurazioni VPC.
-
Server bootstrap errati: verificare che il nome host e la
kafka.bootstrap.serversporta siano corretti. - risoluzione DNS: assicurarsi che i nomi host del broker Kafka possano essere risolti dalla rete Azure Databricks.
- Problemi di SSL/TLS: se si usa SSL, verificare che i certificati siano configurati correttamente.
Per le configurazioni di peering collegamento privato o VPC, assicurarsi che siano presenti le route di rete corrette.
È consigliabile usare la modalità batch o streaming per Kafka?
Dipende dal caso d'uso:
-
Modalità di streaming (
spark.readStream): usare quando è necessaria l'elaborazione continua dei dati o l'inserimento a bassa latenza. -
Modalità batch (
spark.read): usare per caricamenti di dati monouso, backfill o debug. Richiede siastartingOffsetscheendingOffsets.
Per informazioni dettagliate sulla configurazione degli intervalli di trigger, come , AvailableNow e ProcessingTime, vedere Configurare gli intervalli di trigger di streaming strutturato.
È possibile leggere da più argomenti Kafka in un singolo flusso?
Sì, è possibile usare:
-
subscribe: specificare un elenco delimitato da virgole di argomenti, ad esempio.option("subscribe", "topic1,topic2"). -
subscribePattern: usare un modello regex Java per trovare le corrispondenze con i nomi degli argomenti, ad esempio.option("subscribePattern", "topic-.*").
Come si usa Kafka con le pipeline dichiarative di Lakeflow Spark?
Le pipeline dichiarative di Lakeflow Spark offrono supporto nativo per le origini Kafka. È possibile definire una tabella di streaming che legge da Kafka:
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Per ulteriori dettagli sulle origini di streaming nelle Pipeline dichiarative di Lakeflow Spark, vedere Caricare dati nelle pipeline.
Come si deserializzano le colonne chiave e valore di Kafka?
Le key colonne e value vengono restituite come binario (BINARY tipo). Usare le operazioni dataframe per deserializzarle in base al formato dei dati:
-
Dati stringa: usare
cast("string")per convertire il file binario in stringa. -
Dati JSON: usare
from_json()dopo la conversione a stringa. Vedi la funzionefrom_json. -
Dati Avro: usare
from_avro()per deserializzare i dati con codifica Avro. Vedere Leggere e scrivere dati Avro in streaming. -
Buffer di protocollo: usare
from_protobuf()per deserializzare i dati protobuf. Vedere Buffer del protocollo di lettura e scrittura.
Perché sto ricevendo un errore di scrittura idempotente?
Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se il cluster Kafka usa la versione 2.8.0 o successiva con ACL configurati ma senza IDEMPOTENT_WRITE abilitato, la scrittura non riesce con: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option("kafka.enable.idempotence", "false") durante la configurazione del writer Structured Streaming.
Che cos'è KAFKA_DATA_LOSS_ERROR e come risolverlo?
Questo errore si verifica quando l'origine Kafka rileva che gli offset archiviati nel checkpoint non sono più disponibili in Kafka, in genere perché:
- Il flusso è stato sospeso per un periodo più lungo del periodo di conservazione di Kafka.
- I dati dell'argomento Kafka sono stati eliminati o l'argomento è stato ricreato.
- Il broker Kafka ha riscontrato una perdita di dati.
Per risolvere:
-
Se la perdita di dati è accettabile: impostare
.option("failOnDataLoss", "false")per consentire al flusso di continuare dalla prima differenza disponibile. -
Se la perdita di dati non è accettabile: reimpostare il checkpoint e rielaborare dagli
earliestoffset oppure ripristinare i dati mancanti di Kafka.
Per ulteriori informazioni, vedere la condizione di errore KAFKA_DATA_LOSS.
Come è possibile controllare la frequenza con cui i dati vengono letti da Kafka?
Usare l'opzione maxOffsetsPerTrigger per limitare il numero di offset (approssimativamente il numero di record) elaborati per micro batch. Ciò consente di evitare batch di grandi dimensioni che potrebbero sovraccaricare l'elaborazione downstream o causare problemi di memoria durante il recupero in un backlog.
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
In alternativa, usare opzioni come minPartitions o maxRecordsPerPartition per controllare il numero di partizioni Spark create per ogni batch.
Come posso monitorare quanto il mio stream è in ritardo rispetto agli offset Kafka più recenti?
Usare le avgOffsetsBehindLatestmetriche , maxOffsetsBehindLateste minOffsetsBehindLatest disponibili nell'avanzamento della query di streaming. Questi report segnalano il numero di offset dietro l'offset disponibile più recente del flusso in tutte le partizioni degli argomenti sottoscritte. Vedere Monitoraggio delle query structured streaming su Azure Databricks.
È anche possibile usare estimatedTotalBytesBehindLatest per stimare i byte totali dei dati che non sono ancora stati elaborati.
Perché l'inizializzazione dello stream Kafka è lenta?
I flussi di Kafka richiedono tempo per:
- Connettersi al cluster Kafka e recuperare i metadati.
- Individuare le partizioni degli argomenti.
- Recuperare gli offset iniziali.
Per i cluster Kafka locali o remoti, la latenza di rete può influire significativamente sul tempo di inizializzazione. Se si eseguono pipeline attivate/pianificate con riavvii frequenti, è consigliabile usare la modalità di streaming continua per evitare un sovraccarico di inizializzazione ripetuto.
Perché l'aggiunta di più executor Spark non aumenta il throughput di Kafka?
Quando i broker Kafka diventano saturi, l'aggiunta di altri executor Spark aumenta i costi senza aumentare la velocità effettiva.
Segni che Kafka è il collo di bottiglia:
- La velocità effettiva si stabilizza nonostante l'aggiunta di più core.
- L'utilizzo della CPU o della rete del broker Kafka è elevato.
- Le attività Spark vengono completate rapidamente, ma attendono nuovi dati.
Per risolvere questo problema, ridimensionare il cluster Kafka aggiungendo broker o aumentando i conteggi delle partizioni per distribuire il carico.
Come è possibile ottimizzare l'utilizzo dei costi e delle risorse di calcolo per lo streaming Kafka?
Per le modalità micro batch e AvailableNow:
- Dimensioni corrette del cluster: monitorare le metriche e impostare una dimensione fissa del cluster appropriata per il picco di carico.
-
Usare
maxOffsetsPerTrigger: limitare le dimensioni dei batch per controllare l'utilizzo delle risorse durante i picchi di carico. - Evitare la scalabilità automatica: i processi di streaming vengono eseguiti in modo continuo e l'aggiunta o la rimozione di nodi causa un sovraccarico di ribilanciamento delle attività.
-
Ridurre la frammentazione dei dati: Le partizioni frammentate causano alcuni compiti di elaborare significativamente più dati rispetto ad altri, portando a ritardi che rallentano il completamento complessivo del batch e sprecano risorse di calcolo per compiti inattivi. Usare l'opzione
minPartitionsper suddividere le partizioni Kafka di grandi dimensioni in partizioni Spark più piccole per un'elaborazione più bilanciata.
Per la modalità in tempo reale, il ridimensionamento corretto è particolarmente importante perché le attività possono rimanere inattive durante l'attesa dei dati. Considerazioni chiave:
- Impostare
maxPartitionsin modo che ogni attività gestisca più partizioni Kafka per ridurre il sovraccarico. - Ottimizzare
spark.sql.shuffle.partitionsper processi con frequenti operazioni di rimescolamento.
Per indicazioni sul ridimensionamento dei cluster per la modalità in tempo reale, vedere Dimensionamento delle risorse di calcolo.
Perché il flusso non restituisce alcun record anche se i dati sono presenti nell'argomento?
Cause comuni includono:
-
Impostazione errata
startingOffsets: il valore predefinito èlatest, che legge solo i nuovi dati in arrivo dopo l'avvio del flusso. ImpostarestartingOffsetssuearliestper leggere i dati esistenti. - Nome dell'argomento errato: assicurarsi di sottoscrivere l'argomento corretto.
- Problemi di autenticazione: il flusso potrebbe essere connesso correttamente, ma non dispone delle autorizzazioni per la lettura dall'argomento. Controlla gli ACL Kafka.
-
Scadenza offset: se il flusso è stato arrestato per molto tempo e gli offset nel checkpoint sono scaduti (eliminati dalla conservazione Kafka), potrebbe essere necessario reimpostare il checkpoint o regolare
failOnDataLoss.