Freigeben über


Migrieren von SparkR zu Sparklyr

SparkR wurde als Teil von Apache Spark entwickelt, und sein Design ist den Benutzern von Scala und Python vertraut, aber potenziell weniger intuitiv für R-Praktiker. Darüber hinaus ist SparkR in Spark 4.0 veraltet.

Im Gegensatz dazu konzentriert sich Sparklyr auf die Bereitstellung einer R-freundlicheren Erfahrung. Sie nutzt die dplyr-Syntax, die Benutzern des tidyverse mit Mustern wie select(), filter(), und mutate() für DataFrame-Vorgänge vertraut ist.

sparklyr ist das empfohlene R-Paket für die Arbeit mit Apache Spark. Auf dieser Seite werden Unterschiede zwischen SparkR und Sparklyr über Sparklyr-APIs erläutert und Informationen zur Codemigration bereitgestellt.

Einrichten der Umgebung

Installation

Wenn Sie sich im Azure Databricks-Arbeitsbereich befinden, ist keine Installation erforderlich. Sparklyr laden mit library(sparklyr). Informationen zum lokalen Installieren von Sparklyr außerhalb von Azure Databricks finden Sie unter "Erste Schritte".

Herstellen einer Verbindung mit Spark

Stellen Sie eine Verbindung mit Sparklyr im Databricks-Arbeitsbereich oder lokal mithilfe von Databricks Connect her:

Arbeitsbereich:

library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect:

sc <- spark_connect(method = "databricks_connect")

Weitere Details und ein erweitertes Lernprogramm zu Databricks Connect mit Sparklyr finden Sie unter "Erste Schritte".

Lesen und Schreiben von Daten

Sparklyr verfügt über eine Familie von spark_read_*()- und spark_write_*()-Funktionen zum Laden und Speichern von Daten, im Gegensatz zu SparkRs generischen read.df()- und write.df()-Funktionen. Es gibt auch eindeutige Funktionen zum Erstellen temporärer Spark DataFrames oder Spark SQL-Ansichten aus R-Datenframes im Arbeitsspeicher.

Aufgabe SparkR Sparklyr
Kopieren von Daten in Spark createDataFrame() copy_to()
Temporäre Ansicht erstellen createOrReplaceTempView() Verwenden Sie invoke() direkt mit der Methode.
Schreiben von Daten in eine Tabelle saveAsTable() spark_write_table()
Schreiben von Daten in ein angegebenes Format write.df() spark_write_<format>()
Lesen von Daten aus einer Tabelle tableToDF() tbl() oder spark_read_table()
Lesen von Daten aus einem angegebenen Format read.df() spark_read_<format>()

Daten werden geladen

So konvertieren Sie einen R-Datenframe in einen Spark DataFrame, oder um eine temporäre Ansicht aus einem DataFrame zu erstellen, um SQL darauf anzuwenden:

SparkR

mtcars_df <- createDataFrame(mtcars)

Sparklyr

mtcars_tbl <- copy_to(
  sc,
  df = mtcars,
  name = "mtcars_tmp",
  overwrite = TRUE,
  memory = FALSE
)

copy_to() erstellt eine temporäre Ansicht mit dem angegebenen Namen. Sie können einen Namen verwenden, um auf Daten zu verweisen, wenn Sie SQL direkt verwenden (z. B sdf_sql(). ). Zudem speichert copy_to() Daten zwischen, indem der memory-Parameter auf TRUE gesetzt wird.

Erstellen von Ansichten

Die folgenden Codebeispiele zeigen, wie temporäre Ansichten erstellt werden:

SparkR

createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

Sparklyr

spark_dataframe(mtcars_tbl) |>
  invoke("createOrReplaceTempView", "mtcars_tmp_view")

Schreiben von Daten

Die folgenden Codebeispiele zeigen, wie Daten geschrieben werden:

SparkR

# Save a DataFrame to Unity Catalog
saveAsTable(
  mtcars_df,
  tableName = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
  mtcars_df,
  path = "file:/<path/to/save/delta/mtcars>",
  source = "delta",
  mode = "overwrite"
)

Sparklyr

# Save tbl_spark to Unity Catalog
spark_write_table(
  mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
  mtcars_tbl,
  path = "file:/<path/to/save/delta/mtcars>",
  mode = "overwrite"
)

# Use DBI
library(DBI)
dbWriteTable(
  sc,
  value = mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  overwrite = TRUE
)

Lesen von Daten

Die folgenden Codebeispiele zeigen, wie Daten gelesen werden:

SparkR

# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
  path = "file:/<path/to/read/csv/data.csv>",
  source = "csv",
  header = TRUE,
  inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
  path = "file:/<path/to/read/delta/mtcars>",
  source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

Sparklyr

# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")

# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))

# Load csv from local filesystem as tbl_spark
spark_read_csv(
  sc,
  name = "mtcars_csv",
  path = "file:/<path/to/csv/mtcars>",
  header = TRUE,
  infer_schema = TRUE
)

# Load delta from local filesystem as tbl_spark
spark_read_delta(
  sc,
  name = "mtcars_delta",
  path = "file:/tmp/test/sparklyr1"
)

# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")

Verarbeiten von Daten

Auswählen und Filtern

SparkR

# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

Sparklyr

# Select specific columns
mtcars_tbl |>
  select(mpg, cyl, hp)

# Filter rows where mpg > 20
mtcars_tbl |>
  filter(mpg > 20)

Hinzufügen von Spalten

SparkR

# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

Sparklyr

# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
  mutate(power_to_weight = hp / wt)

Gruppierung und Aggregation

SparkR

# Calculate average mpg and hp by number of cylinders
mtcars_df |>
  groupBy("cyl") |>
  summarize(
    avg_mpg = avg(mtcars_df$mpg),
    avg_hp = avg(mtcars_df$hp)
  )

Sparklyr

# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
  group_by(cyl) |>
  summarize(
    avg_mpg = mean(mpg),
    avg_hp = mean(hp)
  )

Verknüpfungen

Angenommen, wir haben ein weiteres Dataset mit Zylinderbeschriftungen, die wir mit Mtcars verbinden möchten.

SparkR

# Create another DataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
  x = mtcars_df,
  y = cylinders_df,
  mtcars_df$cyl == cylinders_df$cyl,
  joinType = "inner"
)

Sparklyr

# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)

# join mtcars_df with cylinders_tbl
mtcars_tbl |>
  inner_join(cylinders_tbl, by = join_by(cyl))

Benutzerdefinierte Funktionen (UDFs)

So erstellen Sie eine benutzerdefinierte Funktion für die Kategorisierung:

# Define the custom function
categorize_hp <- function(df)
  df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
  df

SparkR

SparkR erfordert die explizite Definition des Ausgabeschemas, bevor eine Funktion angewendet wird:

# Define the schema for the output DataFrame
schema <- structType(
  structField("mpg", "double"),
  structField("cyl", "double"),
  structField("disp", "double"),
  structField("hp", "double"),
  structField("drat", "double"),
  structField("wt", "double"),
  structField("qsec", "double"),
  structField("vs", "double"),
  structField("am", "double"),
  structField("gear", "double"),
  structField("carb", "double"),
  structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
  mtcars_df,
  func = categorize_hp,
  schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
  mtcars_df,
  cols = "hp",
  func = categorize_hp,
  schema = schema
)

Sparklyr

# Load Arrow to avoid cryptic errors
library(arrow)

# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
  spark_apply(f = categorize_hp)

# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
  spark_apply(
    f = summary,
    group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
  )

spark.lapply() vs spark_apply()

In SparkR spark.lapply() wird anstelle von DataFrames auf R-Listen ausgeführt. Es gibt keine direkte Entsprechung in sparklyr, aber Sie können ein ähnliches Verhalten erzielen, indem Sie mit einem DataFrame arbeiten, der eindeutige Bezeichner enthält, und indem Sie nach diesen IDs gruppieren. In einigen Fällen können zeilenweise Vorgänge auch vergleichbare Funktionen bereitstellen. Weitere Informationen spark_apply()finden Sie unter Verteilen von R-Berechnungen.

SparkR

# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
  x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

Sparklyr

# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)

# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)

# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")

# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
  df |>
    dplyr::rowwise() |>
    dplyr::mutate(x = id * 2)

spark_apply(sdf, f = row_func)

Maschinelles Lernen

Vollständige SparkR- und Sparklyr-Beispiele für maschinelles Lernen finden Sie im Spark ML-Leitfaden und sparklyr-Referenz.

Hinweis

Wenn Sie Spark MLlib nicht verwenden, empfiehlt Databricks die Verwendung von UDFs zum Trainieren mit der Bibliothek Ihrer Wahl (z. B xgboost. ).

Lineare Regression

SparkR

# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

Sparklyr

# Select features
training_tbl <- mtcars_tbl |>
  select(mpg, hp, wt)

# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
  ml_linear_regression(response = "mpg", features = c("hp", "wt"))

# View model summary
summary(linear_model)

K-mittel-Clustering

SparkR

# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

Sparklyr

# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
  select(mpg, hp)

# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
  ft_vector_assembler(
    input_cols = c("mpg", "hp"),
    output_col = "features"
  )

# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
  ml_kmeans(features_col = "features", k = 3)

# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)

Leistung und Optimierung

Sammeln

Sowohl SparkR als auch sparklyr verwenden collect() zum Konvertieren von Spark DataFrames in R-Datenrahmen. Sammeln Sie nur kleine Datenmengen in R-Datenframes zurück, oder der Spark-Treiber läuft Gefahr, nicht genügend Speicher zu haben.

Um Speicherfehler zu vermeiden, verfügt SparkR über integrierte Optimierungen in Databricks Runtime, die das Sammeln von Daten oder das Ausführen von benutzerdefinierten Funktionen unterstützen.

Um eine optimale Leistung mit Sparklyr zum Sammeln von Daten und UDFs in Databricks-Runtime-Versionen unter 14.3 LTS sicherzustellen, laden Sie das arrow Paket:

library(arrow)

In-Memory-Partitionierung

SparkR

# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

Sparklyr

# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")

# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)

# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)

# Get number of partitions
sdf_num_partitions(mtcars_tbl)

Zwischenspeicherung

SparkR

# Cache the DataFrame in memory
cache(mtcars_df)

Sparklyr

# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")