Condividi tramite


Autenticazione

Il connettore Kafka di Azure Databricks supporta più metodi di autenticazione per la connessione a Kafka. Questo articolo illustra alcuni dei metodi di autenticazione più comuni in Databricks. L'elenco completo dei metodi di autenticazione supportati è disponibile nella documentazione di Kafka.

Connettersi all'Hub eventi Azure con un'entità servizio

Azure Databricks supporta l'autenticazione dei processi Spark con i servizi di Hub eventi. Questa autenticazione viene eseguita tramite OAuth con Microsoft Entra ID.

Diagramma di autenticazione AAD

Connettersi alle credenziali del servizio di Catalogo Unity

Dal rilascio di Databricks Runtime 16.1, Azure Databricks supporta le credenziali del servizio Catalogo Unity per l'autenticazione in Hub eventi di Azure. Databricks consiglia questo approccio, in particolare quando si esegue lo streaming Kafka in cluster condivisi o in un ambiente di calcolo serverless.

Per usare le credenziali del servizio Catalogo Unity per l'autenticazione, seguire questa procedura:

  • Creare una nuova credenziale del servizio Catalogo Unity. Se non si ha familiarità con questo processo, vedere Creare le credenziali del servizio per istruzioni su come crearne uno.
    • Assicurarsi che il connettore di accesso collegato alle credenziali del servizio disponga delle autorizzazioni necessarie per connettersi a Hub eventi di Azure.
  • Specificare il nome delle credenziali del servizio di Catalogo Unity come opzione di sorgente nella configurazione di Kafka. Impostare l'opzione databricks.serviceCredential sul nome delle credenziali del servizio.

L'esempio seguente configura Kafka come origine usando le credenziali del servizio:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Nota: quando si usa una credenziale del servizio Catalogo Unity per connettersi a Kafka, le opzioni seguenti non sono più necessarie:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Connettersi con un ID client e un segreto

Azure Databricks supporta l'autenticazione con ID Microsoft Entra con un ID client e un segreto negli ambienti di calcolo seguenti:

  • Databricks Runtime 12.2 LTS e versioni successive sui sistemi di calcolo configurati con modalità di accesso dedicato (in precedenza modalità di accesso utente singolo).
  • Databricks Runtime 14.3 LTS e versioni successive sulla computazione configurata con la modalità di accesso standard (in precedenza modalità di accesso condiviso).
  • Pipeline dichiarative di Lakeflow Spark configurate senza il catalogo unity.

Azure Databricks non supporta l'autenticazione con Microsoft Entra ID con un certificato in qualsiasi ambiente di elaborazione, nelle Pipeline dichiarative di Lakeflow Spark configurate con Unity Catalog.

Questa autenticazione non funziona sul calcolo con la modalità di accesso standard o sulle pipeline dichiarative di Unity Catalog Lakeflow Spark.

Per eseguire l'autenticazione con Microsoft Entra ID, è necessario avere i valori seguenti:

  • Un ID del tenant. È possibile trovarla nella scheda Servizi Microsoft Entra ID.

  • ID cliente (noto anche come ID dell'applicazione).

  • Un segreto del cliente. Una volta ottenuto questo, è necessario aggiungerlo come segreto all'area di lavoro di Databricks. Per aggiungere questo segreto, vedere Gestione dei segreti.

  • Argomento di EventHubs. È possibile trovare un elenco di argomenti nella sezione Hub degli Eventi sotto la sezione Entità su una pagina specifica dello spazio dei nomi di Event Hubs. Per lavorare con più argomenti, è possibile impostare il ruolo IAM a livello di Event Hubs.

  • Un server EventHubs. È possibile trovarla nella pagina di panoramica del namespace di Event Hubs specifico:

    Namespace di Event Hubs

Inoltre, per usare Entra ID, è necessario indicare a Kafka di usare il meccanismo SASL OAuth (SASL è un protocollo generico e OAuth è un tipo di "meccanismo" SASL):

  • kafka.security.protocol deve essere SASL_SSL
  • kafka.sasl.mechanism deve essere OAUTHBEARER
  • kafka.sasl.login.callback.handler.class dovrebbe essere un nome completamente qualificato della classe Java con un valore di kafkashaded per il gestore di callback di login della nostra classe Kafka shaded. Vedere l'esempio seguente per la classe esatta.

L'esempio seguente configura Kafka per connettersi a Hub eventi di Azure usando l'autenticazione dell'ID Microsoft Entra con un ID client e un segreto:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Usare SASL/PLAIN per l'autenticazione

Per connettersi a Kafka usando l'autenticazione SASL/PLAIN (nome utente e password), configurare le opzioni seguenti. Usare il nome della classe ombreggiata PlainLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks consiglia di archiviare la password come segreto anziché includerla direttamente nel codice. Per altre informazioni, vedere Gestione dei segreti.

Usare SASL/SCRAM per l'autenticazione

Per connettersi a Kafka usando SASL/SCRAM (SCRAM-SHA-256 o SCRAM-SHA-512), configurare le opzioni seguenti. Usare il nome della classe ombreggiata ScramLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Annotazioni

Sostituire SCRAM-SHA-512 con SCRAM-SHA-256 se il cluster Kafka è configurato per l'uso di SCRAM-SHA-256.

Azure Databricks consiglia di archiviare la password come segreto anziché includerla direttamente nel codice. Per altre informazioni, vedere Gestione dei segreti.

Usare SSL per connettere Azure Databricks a Kafka

Per abilitare le connessioni SSL/TLS a Kafka, impostare kafka.security.protocol su SSL e fornire le opzioni di configurazione del truststore e del keystore, precedute da kafka.. Per le connessioni SSL che richiedono solo l'autenticazione del server (TLS unidirezionale), è necessario un archivio di fiducia. Per TLS reciproco (mTLS) in cui il broker Kafka autentica anche il client, è necessario un trust store e un key store.

Sono disponibili le opzioni SSL/TLS seguenti. Per l'elenco completo delle proprietà SSL, vedere la documentazione sulla configurazione ssl di Apache Kafka e Crittografia e autenticazione con SSL nella documentazione di Confluent.

Opzione Descrizione
kafka.security.protocol Impostare su SSL per abilitare la crittografia TLS.
kafka.ssl.truststore.location Percorso del file dell'archivio attendibilità contenente certificati CA attendibili.
kafka.ssl.truststore.password Password per il file dell'archivio di fiducia.
kafka.ssl.truststore.type Formato file dell'archivio attendibilità (impostazione predefinita: JKS).
kafka.ssl.keystore.location Percorso del file dell'archivio chiavi contenente il certificato client e la chiave privata (obbligatorio per mTLS).
kafka.ssl.keystore.password Password per il file dell'archivio chiavi.
kafka.ssl.key.password Password per la chiave privata nell'archivio chiavi.
kafka.ssl.endpoint.identification.algorithm Algoritmo di verifica del nome host. Il valore predefinito è https. Configurare una stringa vuota per disabilitarlo.

Se si usa SSL, Databricks consiglia di:

L'esempio seguente usa i percorsi di archiviazione degli oggetti e i segreti di Databricks per abilitare una connessione SSL:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Connettere Kafka in HDInsight ad Azure Databricks

  1. Creare un cluster Kafka in HDInsight.

    Vedere Connettersi a Kafka in HDInsight tramite una rete virtuale di Azure per le istruzioni.

  2. Configurare i broker Kafka per pubblicizzare l'indirizzo corretto.

    Seguire le istruzioni in Configurare Kafka per la pubblicità IP. Se gestisci Kafka su macchine virtuali di Azure, assicurati che la configurazione advertised.listeners dei broker sia impostata sull'indirizzo IP interno degli host.

  3. Creare un cluster di Azure Databricks.

  4. Collegare il cluster Kafka al cluster Azure Databricks.

    Seguire le istruzioni in Reti peer virtuali.

Usare i nomi delle classi Kafka incapsulate di Databricks

Azure Databricks aggrega versioni proprietarie e ombreggiate delle librerie client Kafka. Tutti i nomi delle classi client Kafka a cui si fa riferimento nelle opzioni di configurazione dell'autenticazione devono usare il prefisso del nome della classe ombreggiata anziché il nome della classe open source standard. Questo vale per qualsiasi classe a cui si fa riferimento in opzioni come kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classe kafka.sasl.client.callback.handler.class.

L'uso dei nomi di classe non oscurati genera un RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED errore. Per altri dettagli, vedere le domande frequenti .

Gestione degli errori potenziali

  • Impossibile creare un nuovo KafkaAdminClient

    Questo errore kafka interno viene generato se una delle opzioni di autenticazione seguenti non è corretta:

    • ID client (detto anche ID applicazione)
    • ID dell'inquilino
    • Server di Hub eventi

    Per risolvere l'errore, verificare che i valori siano corretti per queste opzioni. Inoltre, è possibile che questo errore venga visualizzato se si modificano le opzioni di configurazione fornite per impostazione predefinita nell'esempio , ad esempio kafka.security.protocol.

  • Nessun record restituito

    Se si sta tentando di visualizzare o elaborare il dataframe ma non si ottengono risultati, nell'interfaccia utente verrà visualizzato quanto segue.

    Nessun messaggio dei risultati

    Questo messaggio indica che l'autenticazione ha avuto esito positivo, ma EventHubs non ha restituito dati. Alcuni possibili motivi (anche se non esaustivi) sono:

    • È stato specificato l'argomento EventHubs errato.
    • L'opzione di configurazione predefinita di Kafka per startingOffsets è latest, e attualmente non si ricevono dati tramite il topic. È possibile impostare startingOffsets su earliest per iniziare a leggere i dati a partire dai primi offset di Kafka.