クラシック コンピューティングからサーバーレス コンピューティングにワークロードを移行します。 サーバーレス コンピューティングは、プロビジョニング、スケーリング、ランタイムアップグレード、最適化を自動的に処理します。
ほとんどのクラシック ワークロードは、コードの変更を最小限に抑えるか、まったく変更なしで移行できます。 このページでは、これらのワークロードに焦点を当てます。
df.cacheなどの一部の機能は、サーバーレスではまだサポートされていませんが、一度使用可能になるとコードを変更する必要はありません。 R または Scala ノートブックに依存する特定のワークロードでは、クラシック コンピューティングが必要であり、サーバーレスに移行することはできません。 現在の制限事項の完全な一覧については、「 サーバーレス コンピューティングの制限事項」を参照してください。
移行の手順
クラシック コンピューティングからサーバーレス コンピューティングにワークロードを移行するには、次の手順に従います。
- 前提条件を確認する: ワークスペース、ネットワーク、クラウド ストレージへのアクセスが要件を満たしていることを確認します。 開始する前にを参照してください。
- コードの更新: 必要なコードと構成の変更を行います。 「 コードを更新する」を参照してください。
- ワークロードをテストする: 移行する前に互換性と正確性を確認します。 ワークロードのテストを参照してください。
- パフォーマンス モードの選択: ワークロードの要件に最も適したパフォーマンス モードを選択します。 パフォーマンス モードの選択を参照してください。
- 段階的な移行: 新しい低リスクのワークロードから始めて、サーバーレスを段階的にロールアウトします。 段階的な移行を参照してください。
- コストの監視: サーバーレス DBU の使用量を追跡し、アラートを設定します。 コストの監視を参照してください。
始める前の準備
移行を開始する前に、ワークスペース内のいくつかのレガシ構成を更新することが必要になる場合があります。
| 前提条件 | アクション | 詳細情報 |
|---|---|---|
| Unity カタログでワークスペースが有効になっている | 必要に応じて Hive Metastore から移行する | Azure Databricks ワークスペースを Unity カタログにアップグレードします |
| 構成されたネットワーク | VPC ピアリングを NCC、Private Link、またはファイアウォール規則に置き換える | サーバーレス コンピューティング プレーン ネットワーク |
| クラウド ストレージ へのアクセス | 従来のデータ アクセス パターンを Unity カタログの外部の場所に置き換える | Unity カタログを使用してクラウド オブジェクト ストレージに接続する |
ワークスペースが サポートされているリージョンであることを確認します。
コードを更新する
次のセクションでは、ワークロードをサーバーレスと互換性させるために必要なコードと構成の変更を示します。
データアクセス
従来のデータ アクセス パターンは、サーバーレスではサポートされていません。 代わりに Unity カタログを使用するようにコードを更新します。
| クラシック パターン | サーバーレス置換 | 詳細情報 |
|---|---|---|
DBFS パス (dbfs:/...) |
Unity のカタログボリューム | Unity Catalog ボリュームとは? |
| Hive メタストア テーブル | Unity カタログ テーブル (または HMS フェデレーション) | Azure Databricks ワークスペースを Unity カタログにアップグレードします |
| ストレージ アカウントの資格情報 | Unity カタログの外部の場所 | Unity カタログを使用してクラウド オブジェクト ストレージに接続する |
| カスタム JDBC JAR | レイクハウスフェデレーション | クエリフェデレーションとは |
Warnung
DBFS アクセスはサーバーレスで制限されます。 移行する前に、すべての dbfs:/ パスを Unity カタログ ボリュームに更新します。 詳細については、「 DBFS に格納されているファイルを移行する」を参照してください。
例: DBFS パスと Hive メタストア参照を置き換える
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")
# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table") # three-level namespace
API とコード
特定の API とコード パターンは、サーバーレスではサポートされていません。 コードを更新する必要があるかどうかを確認するには、この表を参照してください。
| クラシック パターン | サーバーレス置換 | 詳細情報 |
|---|---|---|
RDD API (sc.parallelize、 rdd.map) |
DataFrame API | Spark Connect と Spark クラシックの比較 |
df.cache()、df.persist() |
キャッシュ呼び出しを削除する | サーバーレス コンピューティングの制限事項 |
spark.sparkContext、sqlContext |
spark (SparkSession) を直接使用する |
Spark Connect と Spark クラシックの比較 |
Hive 変数 (${var}) |
SQL DECLARE VARIABLE または Pythonのf文字列 |
DECLARE VARIABLE |
| サポートされていない Spark 構成 | サポートされていない構成を削除します。 サーバーレスでは、ほとんどの設定が自動チューニングされます。 | サーバーレス ノートブックとジョブの Spark プロパティを構成する |
例: RDD 操作を DataFrames に置き換える
from pyspark.sql import functions as F
# sc.parallelize + rdd.map
# Classic: rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()
# rdd.flatMap
# Classic: sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()
# rdd.groupByKey
# Classic: rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()
# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
.groupBy(F.spark_partition_id())
.applyInPandas(process_group, schema="total long").collect())
# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")
例: SparkContext とキャッシュを置き換える
from pyspark.sql.functions import broadcast
# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")
# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]
# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")
# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")
ライブラリと環境
ライブラリと環境は、 基本 環境を使用してワークスペース レベルで管理し、ノートブックの サーバーレス環境を使用してノートブック レベルで管理できます。
| クラシック パターン | サーバーレス置換 | 詳細情報 |
|---|---|---|
| 初期化スクリプト | サーバーレス環境 | サーバーレス環境を構成する |
| クラスター-スコープ ライブラリ | ノートブック スコープまたは環境ライブラリ | サーバーレス環境を構成する |
| Maven/JAR ライブラリ | ジョブ向けの JAR タスクサポート、ノートブック用 PyPI | ジョブの JAR タスク |
| Docker コンテナー | ライブラリのニーズに対応するサーバーレス環境 | サーバーレス環境を構成する |
Python パッケージを requirements.txt にピン留めして、再現可能な環境に対応します。
Specify Python パッケージのバージョンを参照してください。
ストリーミング
ストリーミング ワークロードはサーバーレスでサポートされていますが、特定のトリガーはサポートされていません。 サポートされているトリガーを使用するようにコードを更新します。
| スパーク トリガー | サポートされている | メモ |
|---|---|---|
Trigger.AvailableNow() |
はい | 推奨 |
Trigger.Once() |
はい | 非推奨です。
Trigger.AvailableNow() を代わりに使用します。 |
Trigger.ProcessingTime(interval) |
いいえ |
INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED を返します。 |
Trigger.Continuous(interval) |
いいえ | 代わりに Lakeflow Spark 宣言パイプラインの連続モードを使用する |
既定値 ( .trigger()を設定しない) |
いいえ |
.trigger()省略すると、既定で ProcessingTime("0 seconds") になります。サーバーレスではサポートされていません。
.trigger(availableNow=True)は常に明示的に設定してください。 |
継続的ストリーミングの場合は、継続的モードで Spark 宣言パイプラインに移行するか、でAvailableNowを使用します。 大きなソースの場合は、メモリ不足エラーを防ぐために maxFilesPerTrigger または maxBytesPerTrigger を設定します。
例: ストリーミング トリガーを修正する
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()
# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
query.awaitTermination()
# With OOM prevention for large sources
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 100)
.option("maxBytesPerTrigger", "10g")
.load(input_path)
.writeStream.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
ワークロードをテストする
- クイック互換性テスト: Standard アクセス モードと Databricks Runtime 14.3 以降を使用して、クラシック コンピューティングでワークロードを実行します。 実行が成功した場合、ワークロードはコードを変更することなくサーバーレスに移行できます。
- A/B 比較 (運用環境に推奨): クラシック (制御) とサーバーレス (実験) で同じワークロードを実行します。 出力テーブルを差分し、正確性を確認します。 出力が一致するまで反復処理します。
- 一時的な構成: テスト中に、サポートされている Spark 構成を一時的に設定できます。 安定したら削除します。
パフォーマンス モードを選択する
サーバーレス ジョブとパイプラインでは、標準とパフォーマンス最適化の 2 つのパフォーマンス モードがサポートされます。 選択するパフォーマンス モードは、ワークロードの要件によって異なります。
| モード | 在庫状況 | Startup | 最適な用途 |
|---|---|---|---|
| Standard | ジョブとLakeflow Sparkの宣言型パイプライン | 4 ~ 6 分 | コストセンシティブなバッチ |
| パフォーマンス最適化 | ノートブック、ジョブ、Lakeflow Spark 宣言パイプライン | 秒数 | 対話型、待機時間の影響を受けやすい |
段階的に移行する
- 新しいワークロード: サーバーレスですべての新しいノートブックとジョブを開始します。
- リスクの低いワークロード: 標準アクセス モードと Databricks Runtime 14.3 以降で既に PySpark/SQL ワークロードを移行します。
- 複雑なワークロード: コードの変更が必要なワークロード (RDD の書き換え、DBFS の更新、トリガーの修正) を移行します。
- 残りのワークロード: 機能の拡張に合わせて定期的に確認します。
コストを監視する
サーバーレス課金は、クラスターのアップタイムではなく、DBU の使用量に基づいています。 大規模に移行する前に、代表的なワークロードでコストの期待を検証します。 サーバーレス コストを監視するためのツールと戦略については、「 サーバーレス コンピューティングのコストの監視」を参照してください。
その他のリソース
- サーバーレス コンピューティングのベスト プラクティス: サーバーレス ワークロードの最適化に関するヒント
- サーバーレス コンピューティングの制限事項: 現在の制限事項とサポートされていない機能の完全な一覧
- サーバーレス環境の構成: ライブラリと依存関係を管理する
- サポートされている Spark 構成: サーバーレスで使用可能な Spark 構成
- Spark Connect と従来の Spark: サーバーレス アーキテクチャの動作の違い
- サーバーレス ネットワーク セキュリティ: NCC、Private Link、ファイアウォールの構成
- サーバーレス コンピューティングのリリース ノート: 出荷時に新しい機能を追跡する
- Unity カタログ アップグレード ガイド: Hive Metastore から Unity カタログへの移行
詳細については、次のブログ記事を参照することもできます。
- サーバーレス コンピューティングとは何ですか?: サーバーレス機能と顧客の成果の概要
- データ エンジニアリングの進化: サーバーレス コンピューティングがノートブックと Lakeflow ジョブを変換する方法: サーバーレスが Lakeflow ジョブとパイプラインにどの程度力を持っているか