Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Häufig gestellte Fragen zur Verwendung von Kafka mit Azure Databricks.
Warum erhalte ich eine Fehlermeldung, dass eine Kafka-Option nicht unterstützt oder nicht erkannt wird?
Ein häufiger Fehler besteht darin, das Präfix beim Festlegen von kafka. Kafka-nativen Konfigurationsoptionen zu vergessen. Alle direkt an den Kafka-Client übergebenen Optionen müssen mit dem Präfix kafka. versehen werden.
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Für spezifische Optionen des Spark Kafka-Connectors wie subscribe, startingOffsets und maxOffsetsPerTrigger ist das Präfix nicht erforderlich. Sehen Sie Optionen für die vollständige Liste.
Warum erhalte ich eine Fehlermeldung zu schattierten Kafka-Klassen?
Azure Databricks erfordert die Verwendung von schattierten Kafka-Klassen (präfix kafkashaded. oder shadedmskiam.). Wenn Fehler wie RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED angezeigt werden, müssen Sie die schattierten Klassennamen verwenden:
-
org.apache.kafka.*Für Klassen ist daskafkashaded.Präfix erforderlich. Beispiel:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*Für Klassen ist dasshadedmskiam.Präfix erforderlich. Beispiel:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Warum erhalte ich ein TimeoutException, wenn ich mich mit Kafka verbinde?
Häufige Ursachen sind:
- Netzwerkkonnektivität: Der Computecluster kann die Kafka-Broker nicht erreichen. Überprüfen Sie Firewallregeln, Sicherheitsgruppen und VPC-Konfigurationen.
-
Falsche Bootstrap-Server: Überprüfen Sie, ob der Hostname und der
kafka.bootstrap.serversPort korrekt sind. - DNS-Auflösung: Stellen Sie sicher, dass die Kafka-Broker-Hostnamen aus dem Azure Databricks Netzwerk aufgelöst werden können.
- SSL/TLS-Probleme: Wenn Sie SSL verwenden, überprüfen Sie, ob Zertifikate ordnungsgemäß konfiguriert sind.
Stellen Sie bei Private Link- oder VPC-Peering-Setups sicher, dass die richtigen Netzwerkrouten vorhanden sind.
Sollte ich den Batch- oder Streamingmodus für Kafka verwenden?
Dies hängt von Ihrem Anwendungsfall ab:
-
Streamingmodus (
spark.readStream): Verwenden Sie diesen Modus, wenn Sie eine fortlaufende Datenverarbeitung oder eine Aufnahme mit geringer Latenz benötigen. -
Batchmodus (
spark.read): Wird für einmalige Datenladevorgänge, Rückfüllungen oder Debugging verwendet. Erfordert sowohlstartingOffsetsals auchendingOffsets.
Details zum Konfigurieren von Triggerintervallen für strukturiertes Streaming finden Sie unter Konfigurieren von Triggerintervallen wie AvailableNow" , " ProcessingTimeund "Echtzeitmodus".
Kann ich aus mehreren Kafka-Themen in einem einzigen Stream lesen?
Ja, Sie können Folgendes verwenden:
-
subscribe: Stellen Sie eine durch Trennzeichen getrennte Liste von Themen bereit, z. B.option("subscribe", "topic1,topic2"). . -
subscribePattern: Verwenden Sie ein Java regex-Muster, um Themennamen abzugleichen, z. B..option("subscribePattern", "topic-.*").
Wie verwende ich Kafka mit Lakeflow Spark Declarative Pipelines?
Lakeflow Spark Declarative Pipelines bietet native Unterstützung für Kafka-Quellen. Sie können eine Streamingtabelle definieren, die aus Kafka liest:
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Weitere Informationen zu Streamingquellen in Lakeflow Spark Declarative Pipelines finden Sie unter Laden von Daten in Pipelines .
Wie deerialiere ich die Kafka-Schlüssel- und Wertspalten?
Die key und value-Spalten werden als binär (BINARY Typ) zurückgegeben. Verwenden Sie DataFrame-Vorgänge, um sie basierend auf Ihrem Datenformat zu deserialisieren:
-
Stringdaten: Verwenden Sie
cast("string"), um binär in einen String zu konvertieren. -
JSON-Daten: Verwenden Sie
from_json()nachdem Umwandeln in eine Zeichenfolge. Siehefrom_jsonFunktion. -
Avro-Daten: Wird
from_avro()verwendet, um Avro-codierte Daten zu deserialisieren. Weitere Informationen finden Sie unter Lesen und Schreiben von Avro-Streamingdaten. -
Protokoll-Buffers: Verwenden Sie
from_protobuf()zum Deserialisieren von Protokoll-Daten in Protobuf-Format. Weitere Informationen finden Sie unter Lesen und Schreiben von Protokollpuffern.
Warum erhalte ich einen idempotenten Schreibfehler?
In Databricks Runtime 13.3 LTS und höher ist eine neuere Version der kafka-clients-Bibliothek enthalten, die standardmäßig idempotente Schreibvorgänge ermöglicht. Wenn Ihr Kafka-Cluster Version 2.8.0 oder älter verwendet, mit konfigurierten ACLs, aber IDEMPOTENT_WRITE nicht aktiviert ist, schlägt der Schreibvorgang mit der Meldung org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state fehl.
Sie können diesen Fehler beheben, indem Sie ein Upgrade auf Kafka Version 2.8.0 oder höher durchführen oder beim Konfigurieren des Writers für strukturiertes Streaming .option("kafka.enable.idempotence", "false") festlegen.
Was ist KAFKA_DATA_LOSS_ERROR und wie kann ich es auflösen?
Dieser Fehler tritt auf, wenn die Kafka-Quelle erkennt, dass im Prüfpunkt gespeicherte Offsets nicht mehr in Kafka verfügbar sind, in der Regel weil:
- Der Datenstrom wurde länger als der Kafka-Aufbewahrungszeitraum angehalten.
- Kafka-Themendaten wurden gelöscht oder das Thema neu erstellt.
- Kafka Broker erlebten Datenverlust.
Behebung:
-
Wenn Datenverlust akzeptabel ist: Legen Sie fest
.option("failOnDataLoss", "false"), dass der Datenstrom vom frühesten verfügbaren Offset fortgesetzt werden kann. - Wenn Datenverlust nicht akzeptabel ist: Setzen Sie den Checkpoint zurück und verarbeiten Sie die Offsets erneut, oder stellen Sie die verlorenen Kafka-Daten wieder her.
Weitere Informationen finden Sie im Abschnitt KAFKA_DATA_LOSS-Fehlerzustand.
Wie kann ich die Rate steuern, mit der Daten aus Kafka gelesen werden?
Verwenden Sie die maxOffsetsPerTrigger Option, um die Anzahl der Offsets (ungefähr die Anzahl der Datensätze) zu begrenzen, die pro Mikrobatch verarbeitet werden. Auf diese Weise können große Batches verhindert werden, die die nachgelagerte Verarbeitung überwältigen oder Speicherprobleme verursachen können, wenn Sie einen Backlog aufholen.
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternativ können Sie Optionen wie minPartitions oder maxRecordsPerPartition verwenden, um zu steuern, wie viele Spark-Partitionen für jeden Batch erstellt werden sollen.
Wie kann ich überwachen, wie weit mein Stream hinter den neuesten Kafka-Offsets liegt?
Verwenden Sie die im Fortschritt der Streamingabfrage verfügbaren Metriken avgOffsetsBehindLatest, maxOffsetsBehindLatest und minOffsetsBehindLatest. In diesem Bericht wird berichtet, wie viele Offsets hinter dem neuesten verfügbaren Offset für den Datenstrom in allen abonnierten Themenpartitionen liegen. Weitere Informationen finden Sie unter Monitoring Structured Streaming queries on Azure Databricks.
Sie können auch estimatedTotalBytesBehindLatest verwenden, um die Gesamtzahl der Bytes von Daten zu schätzen, die noch nicht verarbeitet wurden.
Warum zeigen meine Kafka Offset-Verzögerungsmetriken persistente Werte ungleich Null nach dem Upgrade auf Databricks Runtime 17.1 an?
In Databricks Runtime 17.1 und höher werden die neuesten Kafka-Offsets nach Abschluss der einzelnen Mikrobatches abgerufen. In Themen, die kontinuierlich Daten empfangen, können Backlogmetriken kleine, persistente Werte ohne Null aufweisen. Dies ist ein erwartetes Verhalten und weist nicht darauf hin, dass der Datenstrom zurückfällt.
In Databricks Runtime 17.0 und darunter werden die neuesten Kafka-Offsets zum Startzeitpunkt von Mikrobatches abgerufen. Backlogmetriken können 0 zurückgeben, wenn Streaming-Abfragen alle Datensätze, die zu Beginn des Mikrobatches verfügbar sind, regelmäßig abrufen.
Wenn Werte hoch sind oder kontinuierlich wachsen, kann der Datenstrom möglicherweise nicht ausreichend mit der Verarbeitung der eingehenden Daten Schritt halten. Weitere Informationen finden Sie unter Monitoring Structured Streaming queries on Azure Databricks.
Warum ist die Initialisierung meines Kafka-Datenstroms langsam?
Kafka Streams benötigen Zeit, um:
- Stellen Sie eine Verbindung mit dem Kafka-Cluster her, und rufen Sie Metadaten ab.
- Entdecken Sie Themenpartitionen.
- Rufen Sie die ursprünglichen Offsets ab.
Bei lokalen oder Remote-Kafka-Clustern kann sich die Netzwerklatenz erheblich auf die Initialisierungszeit auswirken. Wenn Sie ausgelöste/geplante Pipelines mit häufigen Neustarts ausführen, sollten Sie den fortlaufenden Streamingmodus verwenden, um wiederholten Initialisierungsaufwand zu vermeiden.
Warum werden keine weiteren Spark-Executoren hinzugefügt, die meinen Kafka-Durchsatz erhöhen?
Sobald die Kafka-Broker gesättigt wurden, erhöht das Hinzufügen weiterer Spark-Executors die Kosten, ohne den Durchsatz zu erhöhen.
Anzeichen dafür, dass Kafka der Engpass ist:
- Der Durchsatz erreicht eine Plateaubildung, obwohl weitere Kerne hinzugefügt werden.
- Die Cpu- oder Netzwerkauslastung des Kafka-Brokers ist hoch.
- Spark-Aufgaben werden schnell abgeschlossen, warten jedoch auf neue Daten.
Um dies zu beheben, skalieren Sie Ihren Kafka-Cluster, indem Sie Broker hinzufügen oder die Partitionsanzahl erhöhen, um die Auslastung zu verteilen.
Wie kann ich die Kosten- und Berechnungsauslastung für Kafka-Streaming optimieren?
Für Mikrobatch- und AvailableNow-Modi:
- Richtige Größe Ihres Clusters: Überwachen Sie Metriken, und legen Sie eine geeignete feste Clustergröße für Spitzenlast fest.
-
Verwendung von
maxOffsetsPerTrigger: Begrenzen Sie die Stapelgrößen, um die Ressourcennutzung bei Lastspitzen zu kontrollieren. - Vermeiden Sie die automatische Skalierung: Streaming-Jobs werden kontinuierlich ausgeführt, und das Hinzufügen oder Entfernen von Knoten verursacht einen Neuausgleich der Aufgaben.
-
Reduzierung von Datenverzerrung: Schiefe Partitionen führen dazu, dass einige Aufgaben wesentlich mehr Daten verarbeiten als andere, was zu Nachzüglern führt, die den gesamten Batchvorgang abschließen verlangsamen und Ressourcen auf untätige Aufgaben verschwenden. Verwenden Sie die
minPartitionsOption zum Aufteilen großer Kafka-Partitionen in kleinere Spark-Partitionen für eine ausgewogenere Verarbeitung.
Für den Echtzeitmodus ist die rechte Größenanpassung besonders wichtig, da Aufgaben beim Warten auf Daten im Leerlauf bleiben können. Wichtige Überlegungen:
- Legen Sie
maxPartitionsfest, dass jeder Vorgang mehrere Kafka-Partitionen verarbeitet, um den Aufwand zu reduzieren. - Optimieren Sie
spark.sql.shuffle.partitionsfür shuffleintensive Aufträge.
Eine Anleitung zur Größenanpassung von Clustern für den Echtzeitmodus finden Sie unter Berechnen der Größenanpassung .
Warum gibt mein Datenstrom keine Datensätze zurück, obwohl daten im Thema vorhanden sind?
Häufige Ursachen sind:
-
Falsche
startingOffsetsEinstellung: Der Standardwert istlatest, der nur neue Daten liest, die nach dem Start des Datenstroms eintreffen. Legen SiestartingOffsetsaufearliestfest, um vorhandene Daten zu lesen. - Falscher Themenname: Überprüfen Sie, ob Sie das richtige Thema abonnieren.
- Authentifizierungsprobleme: Ihr Datenstrom ist möglicherweise erfolgreich verbunden, verfügt jedoch nicht über Berechtigungen zum Lesen aus dem Topic. Überprüfen Sie Ihre Kafka ACLs.
-
Offset-Verfall: Wenn Ihr Datenstrom lange angehalten wurde und die Offsets im Prüfpunkt abgelaufen sind (von der Kafka-Datenhaltung gelöscht wurden), müssen Sie den Prüfpunkt möglicherweise zurücksetzen oder
failOnDataLossanpassen.