Freigeben über


Authentifizierung

Der Azure Databricks Kafka-Connector unterstützt mehrere Authentifizierungsmethoden für die Verbindung mit Kafka. In diesem Artikel werden einige der am häufigsten verwendeten Authentifizierungsmethoden für Databricks behandelt. Die vollständige Liste der unterstützten Authentifizierungsmethoden finden Sie in der Kafka-Dokumentation.

Verbindung zu Azure Event Hubs mit einem Dienstprinzipal herstellen

Azure Databricks unterstützt die Authentifizierung von Spark-Aufträgen mit Event Hubs-Diensten. Diese Authentifizierung erfolgt über OAuth mit Microsoft Entra ID.

AAD-Authentifizierungsdiagramm

Verbinden mit Unity Catalog-Dienstanmeldeinformationen

Seit der Veröffentlichung von Databricks Runtime 16.1 unterstützt Azure Databricks Unity Catalog-Dienstanmeldeinformationen für die Authentifizierung für Azure Event Hubs. Databricks empfiehlt diesen Ansatz, insbesondere beim Ausführen des Kafka-Streamings auf gemeinsam genutzten Clustern oder serverlosen Compute.

Führen Sie die folgenden Schritte aus, um anmeldeinformationen für den Unity-Katalogdienst für die Authentifizierung zu verwenden:

  • Erstellen Sie ein neues Unity-Katalogdienst-Anmeldeinformation. Wenn Sie mit diesem Prozess nicht vertraut sind, lesen Sie Erstellen von Dienstanmeldeinformationen für Anweisungen zur Erstellung.
    • Stellen Sie sicher, dass der an Ihre Dienstanmeldeinformationen angefügte Zugriffsconnector über die erforderlichen Berechtigungen zum Herstellen einer Verbindung mit Azure Event Hubs verfügt.
  • Geben Sie den Namen Ihrer Unity Catalog Service-Anmeldeinformationen als Quelloption in Ihrer Kafka-Konfiguration an. Legen Sie die Option databricks.serviceCredential auf den Namen Ihrer Dienstanmeldeinformationen fest.

Im folgenden Beispiel wird Kafka als Quelle unter Verwendung von Service-Anmeldeinformationen konfiguriert.

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Hinweis: Wenn Sie eine Unity Catalog-Dienstanmeldeinformationen zum Herstellen einer Verbindung mit Kafka verwenden, sind die folgenden Optionen nicht mehr erforderlich:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Verbinden mit einer Client-ID und einem Secret

Azure Databricks unterstützt die Microsoft Entra ID Authentifizierung mit einer Client-ID und einem geheimen Schlüssel in den folgenden Computeumgebungen:

  • Databricks Runtime 12.2 LTS und höher auf Computeressourcen, die mit dediziertem Zugriffsmodus (ehemals Einzelbenutzerzugriffsmodus) konfiguriert sind.
  • Databricks Runtime 14.3 LTS und höher auf der Recheneinheit, die im Standardzugriffsmodus konfiguriert ist (früher gemeinsam genutzter Zugriffsmodus).
  • Lakeflow Spark Declarative Pipelines ohne Unity-Katalog konfiguriert.

Azure Databricks unterstützt keine Microsoft Entra ID-Authentifizierung mit einem Zertifikat in einer Berechnungsumgebung oder in Lakeflow Spark Deklarative Pipelines, die mit Unity Catalog konfiguriert sind.

Diese Authentifizierung funktioniert nicht bei der Berechnung mit Standardzugriffsmodus oder in Unity Catalog Lakeflow Spark Declarative Pipelines.

Um die Authentifizierung mit Microsoft Entra ID durchzuführen, müssen Sie die folgenden Werte haben:

  • Eine Mandanten-ID. Dies finden Sie auf der Registerkarte Microsoft Entra ID Services.

  • Eine Client-ID (auch als Anwendungs-ID bezeichnet).

  • Einen geheimen Clientschlüssel. Sobald Sie diesen haben, sollten Sie ihn als Geheimnis zu Ihrem Databricks-Arbeitsbereich hinzufügen. Informationen zum Hinzufügen dieses Geheimnisses finden Sie unter Verwaltung von Geheimnissen.

  • Ein EventHubs-Thema. Eine Liste der Themen finden Sie im Abschnitt Event Hubs unter dem Abschnitt Entitäten auf einer bestimmten Seite des Event Hubs-Namespace. Um mit mehreren Themen zu arbeiten, können Sie die IAM-Rolle auf Event Hubs-Ebene festlegen.

  • Einen EventHubs-Server. Diesen finden Sie auf der Übersichtsseite Ihres bestimmten Event Hubs-Namespace:

    Event Hubs-Namespace

Darüber hinaus müssen wir Kafka zur Verwendung von Entra ID anweisen, den OAuth SASL-Mechanismus zu verwenden (SASL ist ein generisches Protokoll, und OAuth ist eine Art von SASL"-Mechanismus):

  • kafka.security.protocol sollte SASL_SSL lauten.
  • kafka.sasl.mechanism sollte OAUTHBEARER lauten.
  • kafka.sasl.login.callback.handler.class sollte ein vollqualifizierter Name der Java Klasse mit dem Wert kafkashaded für den Anmelderückrufhandler unserer schattierten Kafka-Klasse sein. Das folgende Beispiel zeigt die genaue Klasse.

Im folgenden Beispiel wird Kafka so konfiguriert, dass eine Verbindung mit Azure Event Hubs mithilfe Microsoft Entra ID Authentifizierung mit einer Client-ID und einem geheimen Schlüssel hergestellt wird:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Verwenden von SASL/PLAIN zur Authentifizierung

Um eine Verbindung mit Kafka mithilfe der SASL/PLAIN-Authentifizierung (Benutzername und Kennwort) herzustellen, konfigurieren Sie die folgenden Optionen. Verwenden Sie den Namen der schattierten PlainLoginModule Klasse:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks empfiehlt, Ihr Kennwort nicht direkt in Ihren Code einzugeben, sondern als geheim zu speichern. Weitere Informationen finden Sie unter "Geheime Verwaltung".

Verwenden von SASL/SCRAM zur Authentifizierung

Um eine Verbindung mit Kafka mithilfe von SASL/SCRAM (SCRAM-SHA-256 oder SCRAM-SHA-512) herzustellen, konfigurieren Sie die folgenden Optionen. Verwenden Sie den Namen der schattierten ScramLoginModule Klasse:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Hinweis

Ersetzen Sie SCRAM-SHA-512 durch SCRAM-SHA-256, wenn Ihr Kafka-Cluster für die Verwendung von SCRAM-SHA-256 konfiguriert ist.

Azure Databricks empfiehlt, Ihr Kennwort nicht direkt in Ihren Code einzugeben, sondern als geheim zu speichern. Weitere Informationen finden Sie unter "Geheime Verwaltung".

Verwenden Sie SSL, um Azure Databricks mit Kafka zu verbinden.

Um SSL/TLS-Verbindungen zu Kafka zu ermöglichen, setzen Sie kafka.security.protocol auf SSL und geben Sie die Konfigurationsoptionen für den Vertrauensspeicher und den Schlüsselspeicher mit dem Präfix kafka. an. Für SSL-Verbindungen, die nur serverbasierte Authentifizierung (unidirektionales TLS) erfordern, benötigen Sie einen Vertrauensspeicher. Bei gegenseitigem TLS (mTLS), bei dem der Kafka-Broker auch den Client authentifiziert, benötigen Sie sowohl einen Vertrauensspeicher als auch einen Schlüsselspeicher.

Die folgenden SSL/TLS-Optionen sind verfügbar. Die vollständige Liste der SSL-Eigenschaften finden Sie in der Dokumentation zur Apache Kafka SSL-Konfiguration und Verschlüsselung und Authentifizierung mit SSL in der Confluent-Dokumentation.

Auswahl Beschreibung
kafka.security.protocol Legen Sie diesen Wert fest, SSL um die TLS-Verschlüsselung zu aktivieren.
kafka.ssl.truststore.location Pfad zur Truststore-Datei, die vertrauenswürdige CA-Zertifikate enthält.
kafka.ssl.truststore.password Kennwort für die Vertrauensspeicherdatei.
kafka.ssl.truststore.type Dateiformat des Vertrauensspeichers (Standard: JKS).
kafka.ssl.keystore.location Pfad zur Schlüsselspeicherdatei, die das Clientzertifikat und den privaten Schlüssel enthält (erforderlich für mTLS).
kafka.ssl.keystore.password Kennwort für die Schlüsselspeicherdatei.
kafka.ssl.key.password Kennwort für den privaten Schlüssel im Schlüsselspeicher.
kafka.ssl.endpoint.identification.algorithm Hostnamenüberprüfungsalgorithmus. Wird standardmäßig auf https festgelegt. Legen Sie diese auf eine leere Zeichenfolge fest, wenn Sie sie deaktivieren wollen.

Wenn Sie SSL verwenden, empfiehlt Databricks Folgendes:

  • Speichern Sie Ihre Zertifikate in einem Unity-Katalogvolume. Benutzer, die Zugriff auf Leseberechtigungen des Volumens haben, können Ihre Kafka-Zertifikate verwenden. Weitere Informationen finden Sie unter Was sind Unity Catalog-Volumes?.
  • Speichern Sie Ihre Zertifikat-Kennwörter als geheime Schlüssel in einem geheimen Bereich. Weitere Informationen finden Sie unter Verwalten geheimer Bereiche.

Im folgenden Beispiel werden Objektspeicherorte und Databricks-Geheimnisse verwendet, um eine SSL-Verbindung zu aktivieren:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Kafka in HDInsight mit Azure Databricks verbinden

  1. Erstellen Sie einen Kafka-Cluster in HDInsight.

    Anweisungen finden Sie unter Connect to Kafka on HDInsight through an Azure Virtual Network.

  2. Konfigurieren Sie die Kafka-Broker so, dass sie die richtige Adresse ankündigen.

    Befolgen Sie die Anweisungen unter Konfigurieren von Kafka zum Ankündigen der IP-Adresse. Wenn Sie Kafka selbst auf Azure Virtual Machines verwalten, stellen Sie sicher, dass die konfiguration advertised.listeners der Broker auf die interne IP der Hosts festgelegt ist.

  3. Erstellen Sie einen Azure Databricks Cluster.

  4. Verknüpfen Sie das Kafka-Cluster mit dem Azure Databricks-Cluster.

    Befolgen Sie die Anweisungen unter Einrichten eines Peerings von virtuellen Netzwerken.

Verwenden Sie die von Databricks bereitgestellten Kafka-Klassennamen

Azure Databricks bündeln proprietäre, schattierte Versionen der Kafka-Clientbibliotheken. Alle Kafka-Clientklassennamen, auf die Sie in Authentifizierungskonfigurationsoptionen verweisen, müssen anstelle des Standardmäßigen Open-Source-Klassennamens das Präfix der schattierten Klasse verwenden. Dies gilt für alle Klassen, auf die in Optionen wie kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class und kafka.sasl.client.callback.handler.class verwiesen wird.

Die Verwendung nicht abgeschatter Klassennamen führt zu einem RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED Fehler. Weitere Informationen finden Sie in den häufig gestellten Fragen .

Behandeln potenzieller Fehler

  • Fehler beim Erstellen eines neuen KafkaAdminClient

    Dieser interne Kafka-Fehler wird ausgelöst, wenn eine der folgenden Authentifizierungsoptionen falsch ist:

    • Client-ID (auch als Anwendungs-ID bezeichnet)
    • Mieter-ID
    • Event Hubs-Server

    Um den Fehler zu beheben, überprüfen Sie, ob die Werte für diese Optionen korrekt sind. Darüber hinaus wird dieser Fehler möglicherweise angezeigt, wenn Sie die im Beispiel standardmäßig bereitgestellten Konfigurationsoptionen (wie zum Beispiel kafka.security.protocol) ändern.

  • Keine Datensätze zurückgegeben

    Wenn Sie versuchen, Ihren DataFrame anzuzeigen oder zu verarbeiten, aber keine Ergebnisse erhalten, sehen Sie folgendes in der Benutzeroberfläche.

    Keine Ergebnismeldung

    Diese Meldung bedeutet, dass die Authentifizierung erfolgreich war, aber EventHubs hat keine Daten zurückgegeben. Einige mögliche (aber keineswegs vollständige) Gründe sind:

    • Sie haben das falsche EventHubs-Thema angegeben.
    • Die Standard-Kafka-Konfigurationsoption für „startingOffsets“ ist „latest“, und Sie erhalten derzeit noch keine Daten über das Thema. Sie können „startingOffsets“ auf „earliest“ setzen, um mit dem Lesen von Daten ab den frühesten Offsets von Kafka zu beginnen.