Condividi tramite


Eseguire la migrazione dal calcolo classico al calcolo serverless

Eseguire la migrazione dei carichi di lavoro dal calcolo classico al calcolo serverless. Il calcolo serverless gestisce automaticamente il provisioning, il ridimensionamento, gli aggiornamenti di runtime e l'ottimizzazione.

La maggior parte dei carichi di lavoro classici può eseguire la migrazione con modifiche minime o senza modifiche al codice. Questa pagina è incentrata su questi carichi di lavoro. Alcune funzionalità, ad esempio df.cache, non sono ancora supportate in serverless, ma non richiedono modifiche al codice una volta disponibili. Alcuni carichi di lavoro che dipendono da notebook R o Scala richiedono risorse di calcolo classiche e non potranno eseguire la migrazione a serverless. Per un elenco completo delle limitazioni correnti, vedere Limitazioni di calcolo serverless.

Passaggi per la migrazione

Per eseguire la migrazione dei carichi di lavoro dal calcolo classico al calcolo serverless, seguire questa procedura:

  1. Verificare i prerequisiti: verificare che l'area di lavoro, la rete e l'accesso alle risorse di archiviazione cloud soddisfino i requisiti. Vedere Prima di iniziare.
  2. Aggiornare il codice: apportare le modifiche necessarie al codice e alla configurazione. Vedere Aggiornare il codice.
  3. Testare i carichi di lavoro: Convalidare la compatibilità e la correttezza prima della transizione. Vedere Testare i carichi di lavoro.
  4. Scegliere una modalità di prestazioni: selezionare la modalità prestazioni più adatta ai requisiti del carico di lavoro. Vedere Scegliere una modalità di prestazioni.
  5. Eseguire la migrazione in fasi: implementare in modo incrementale serverless, a partire da carichi di lavoro nuovi e a basso rischio. Vedere Eseguire la migrazione in fasi.
  6. Monitorare i costi: Monitorare il consumo di DBU serverless e configurare avvisi. Vedi Monitorare i costi.

Prima di iniziare

Prima di iniziare la migrazione, potrebbe essere necessario aggiornare alcune configurazioni legacy nell'area di lavoro.

Prerequisito Action dettagli
L'area di lavoro è abilitata per il Catalogo di Unity Eseguire la migrazione da Metastore Hive, se necessario Upgrade un'area di lavoro Azure Databricks in Unity Catalog
Rete configurata Sostituire il peering VPC con NCC (Network Connectivity Centers), Private Link o regole del firewall. Rete di interconnessione della piattaforma di calcolo serverless
Accesso alle risorse di archiviazione cloud Sostituire i modelli legacy di accesso ai dati con le posizioni esterne di Unity Catalog. Connettersi all'archiviazione di oggetti cloud usando il catalogo Unity

Verificare che l'area di lavoro sia in un'area supportata.

Aggiornare il codice

Le sezioni seguenti elencano le modifiche al codice e alla configurazione necessarie per rendere i carichi di lavoro compatibili con serverless.

L'accesso ai dati

I modelli di accesso ai dati legacy non sono supportati in serverless. Aggiornare il codice per usare invece Unity Catalog.

Modello classico Sostituzione serverless dettagli
Percorsi DBFS (dbfs:/...) Volumi del catalogo Unity Che cosa sono i volumi di Unity Catalog?
Tabelle metastore Hive Tabelle del catalogo Unity (o federazione HMS) Upgrade un'area di lavoro Azure Databricks in Unity Catalog
Credenziali dell'account di archiviazione Posizioni esterne del catalogo Unity Connettersi all'archiviazione di oggetti cloud usando il catalogo Unity
JAR JDBC personalizzati Lakehouse Federation Che cos'è la federazione di interrogazioni?

Avvertimento

L'accesso a DBFS è limitato in serverless. Aggiorna tutti i percorsi dbfs:/ ai volumi di Unity Catalog prima della migrazione. Per altre informazioni, vedere Eseguire la migrazione dei file archiviati in DBFS.

Esempio: Sostituire i percorsi DBFS e i riferimenti al metastore Hive
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")

# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table")  # three-level namespace

API e codice

Alcune API e modelli di codice non sono supportate in serverless. Fare riferimento a questa tabella per verificare se è necessario aggiornare il codice.

Modello classico Sostituzione serverless dettagli
RDD API (sc.parallelize, rdd.map) API del dataframe Confronta Spark Connect a Spark Classic
df.cache(), df.persist() Rimuovere le chiamate di memorizzazione nella cache Limitazioni di calcolo serverless
spark.sparkContext, sqlContext Usare spark direttamente (SparkSession) Confronta Spark Connect a Spark Classic
Variabili Hive (${var}) SQL DECLARE VARIABLE o stringhe f di Python DECLARE VARIABLE
Configurazioni Spark non supportate Rimuovere le configurazioni non supportate. Serverless ottimizza automaticamente la maggior parte delle impostazioni. Configurare le proprietà di Spark per notebook e processi serverless
Esempio: Sostituire le operazioni RDD con i DataFrame
from pyspark.sql import functions as F

# sc.parallelize + rdd.map
# Classic:  rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()

# rdd.flatMap
# Classic:  sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()

# rdd.groupByKey
# Classic:  rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()

# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
    .groupBy(F.spark_partition_id())
    .applyInPandas(process_group, schema="total long").collect())

# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")
Esempio: Sostituire SparkContext e memorizzare nella cache
from pyspark.sql.functions import broadcast

# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")

# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]

# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")

# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")

Librerie e ambienti

È possibile gestire librerie e ambienti a livello di area di lavoro usando ambienti di base e a livello di notebook usando l'ambiente serverless del notebook.

Modello classico Sostituzione serverless dettagli
Gli script di inizializzazione Ambienti serverless Configurare l'ambiente serverless
Librerie con ambito a livello di cluster Librerie con ambito nel notebook o di ambiente Configurare l'ambiente serverless
Librerie Maven/JAR Supporto delle attività JAR per i processi; PyPI per notebook Attività JAR per i job
Contenitori Docker Ambienti serverless per esigenze di libreria Configurare l'ambiente serverless

Aggiungere pacchetti Python in requirements.txt per ambienti riproducibili. Specificare versioni dei pacchetti Python.

Trasmissione in diretta

I carichi di lavoro di streaming sono supportati in serverless, ma alcuni trigger non sono supportati. Aggiornare il codice per usare i trigger supportati.

Trigger Spark Supportato Note
Trigger.AvailableNow() Raccomandato
Trigger.Once() Questa operazione è deprecata. Utilizzare invece Trigger.AvailableNow().
Trigger.ProcessingTime(interval) No Restituisce INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED.
Trigger.Continuous(interval) No Usare invece la modalità continua Di Lakeflow Spark Declarative Pipelines
Impostazione predefinita (senza configurazione .trigger()) No Omettendo .trigger(), il valore predefinito diventa ProcessingTime("0 seconds"), che non è supportato nei serverless. .trigger(availableNow=True) Impostare sempre esplicitamente.

Per lo streaming continuo, eseguire la migrazione alle pipeline dichiarative di Spark in modalità continua oppure utilizzare i job a pianificazione continua con AvailableNow. Per le origini di grandi dimensioni, impostare maxFilesPerTrigger o maxBytesPerTrigger per prevenire errori di esaurimento della memoria.

Esempio: Correzione dei trigger di streaming
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()

# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .start(output_path))
query.awaitTermination()

# With OOM prevention for large sources
query = (spark.readStream.format("delta")
    .option("maxFilesPerTrigger", 100)
    .option("maxBytesPerTrigger", "10g")
    .load(input_path)
    .writeStream.format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .start(output_path))

Testare i carichi di lavoro

  1. Test di compatibilità rapido: eseguire il carico di lavoro nel calcolo classico con la modalità di accesso Standard e Databricks Runtime 14.3 o versione successiva. Se l'esecuzione ha esito positivo, il carico di lavoro può eseguire la migrazione a serverless senza modifiche al codice.
  2. Confronto A/B (consigliato per la produzione): eseguire lo stesso carico di lavoro in versione classica (controllo) e serverless (esperimento). Controllare le tabelle di output "diff" e verificarne la correttezza. Eseguire l'iterazione fino a quando gli output corrispondono.
  3. Configurazioni temporanee: è possibile impostare temporaneamente configurazioni Spark supportate durante i test. Rimuoverli una volta che sono stabili.

Scegliere una modalità di prestazioni

I processi serverless e le pipeline supportano due modalità di prestazioni: standard e ottimizzati per le prestazioni. La modalità di prestazioni scelta dipende dai requisiti del carico di lavoro.

Modalità Disponibilità Nuova impresa Ideale per
Standard Attività, pipeline dichiarative di Lakeflow Spark 4-6 minuti Batch sensibile ai costi
Prestazioni ottimizzate Notebook, lavori, pipeline dichiarative di Lakeflow Spark Secondi Interattivo, sensibile alla latenza

Eseguire la migrazione in fasi

  1. Nuovi carichi di lavoro: avviare tutti i nuovi notebook e processi in serverless.
  2. Carichi di lavoro a basso rischio: eseguire la migrazione di carichi di lavoro PySpark/SQL già in modalità di accesso standard e Databricks Runtime 14.3 o versione successiva.
  3. Carichi di lavoro complessi: eseguire la migrazione dei carichi di lavoro che necessitano di modifiche al codice (riscrittura RDD, aggiornamenti DBFS, correzioni dei trigger).
  4. Carichi di lavoro rimanenti: esaminare periodicamente man mano che le funzionalità si espandono.

Monitorare i costi

La fatturazione serverless si basa sul consumo DBU e non sul tempo di attività del cluster. Convalidare le aspettative sui costi con carichi di lavoro rappresentativi prima di eseguire la migrazione su larga scala. Per gli strumenti e le strategie per monitorare i costi serverless, vedere Monitorare il costo del calcolo serverless.

Risorse aggiuntive

Per altre informazioni, vedere anche i post di blog seguenti: