Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Raccolta distribuita di dati raggruppati in colonne denominate.
Un dataframe equivale a una tabella relazionale in Spark SQL e può essere creato usando varie funzioni in SparkSession.
Importante
Un dataframe non deve essere creato direttamente usando il costruttore .
Supporta Spark Connect
Proprietà
| Proprietà | Descrizione |
|---|---|
sparkSession |
Restituisce SparkSession che ha creato questo dataframe. |
rdd |
Restituisce il contenuto come RDD della riga (solo modalità classica). |
na |
Restituisce un oggetto DataFrameNaFunctions per la gestione dei valori mancanti. |
stat |
Restituisce un oggetto DataFrameStatFunctions per le funzioni statistiche. |
write |
Interfaccia per salvare il contenuto del dataframe non in streaming in un archivio esterno. |
writeStream |
Interfaccia per salvare il contenuto del dataframe di streaming in un archivio esterno. |
schema |
Restituisce lo schema di questo dataframe come StructType. |
dtypes |
Restituisce tutti i nomi di colonna e i relativi tipi di dati come elenco. |
columns |
Recupera i nomi di tutte le colonne nel dataframe come elenco. |
storageLevel |
Ottenere il livello di archiviazione corrente del dataframe. |
isStreaming |
Restituisce True se questo dataframe contiene una o più origini che restituiscono continuamente i dati non appena arrivano. |
executionInfo |
Restituisce un oggetto ExecutionInfo dopo l'esecuzione della query. |
plot |
Restituisce un PySparkPlotAccessor per le funzioni di tracciato. |
Methods
Visualizzazione e ispezione dei dati
| metodo | Descrizione |
|---|---|
toJSON(use_unicode) |
Converte un dataframe in rdd di stringa o dataframe. |
printSchema(level) |
Stampa lo schema nel formato albero. |
explain(extended, mode) |
Stampa i piani (logici e fisici) nella console a scopo di debug. |
show(n, truncate, vertical) |
Stampa le prime n righe del dataframe nella console. |
collect() |
Restituisce tutti i record nel dataframe come elenco di righe. |
toLocalIterator(prefetchPartitions) |
Restituisce un iteratore che contiene tutte le righe in questo dataframe. |
take(num) |
Restituisce le prime righe num come elenco di Righe. |
tail(num) |
Restituisce le ultime righe num come elenco di Righe. |
head(n) |
Restituisce le prime n righe. |
first() |
Restituisce la prima riga come riga. |
count() |
Restituisce il numero di righe in questo dataframe. |
isEmpty() |
Controlla se il dataframe è vuoto e restituisce un valore booleano. |
describe(*cols) |
Calcola le statistiche di base per le colonne numeriche e stringa. |
summary(*statistics) |
Calcola le statistiche specificate per le colonne numeriche e stringa. |
Visualizzazioni temporanee
| metodo | Descrizione |
|---|---|
createTempView(name) |
Crea una visualizzazione temporanea locale con questo dataframe. |
createOrReplaceTempView(name) |
Crea o sostituisce una visualizzazione temporanea locale con questo dataframe. |
createGlobalTempView(name) |
Crea una visualizzazione temporanea globale con questo dataframe. |
createOrReplaceGlobalTempView(name) |
Crea o sostituisce una visualizzazione temporanea globale usando il nome specificato. |
Selezione e proiezione
| metodo | Descrizione |
|---|---|
select(*cols) |
Proietta un set di espressioni e restituisce un nuovo dataframe. |
selectExpr(*expr) |
Proietta un set di espressioni SQL e restituisce un nuovo dataframe. |
filter(condition) |
Filtra le righe usando la condizione specificata. |
where(condition) |
Alias per il filtro. |
drop(*cols) |
Restituisce un nuovo dataframe senza colonne specificate. |
toDF(*cols) |
Restituisce un nuovo dataframe con nuovi nomi di colonna specificati. |
withColumn(colName, col) |
Restituisce un nuovo dataframe aggiungendo una colonna o sostituendo la colonna esistente con lo stesso nome. |
withColumns(*colsMap) |
Restituisce un nuovo dataframe aggiungendo più colonne o sostituendo le colonne esistenti con gli stessi nomi. |
withColumnRenamed(existing, new) |
Restituisce un nuovo dataframe rinominando una colonna esistente. |
withColumnsRenamed(colsMap) |
Restituisce un nuovo dataframe rinominando più colonne. |
withMetadata(columnName, metadata) |
Restituisce un nuovo dataframe aggiornando una colonna esistente con metadati. |
metadataColumn(colName) |
Seleziona una colonna di metadati in base al nome della colonna logica e la restituisce come colonna. |
colRegex(colName) |
Seleziona la colonna in base al nome della colonna specificata come espressione regolare e la restituisce come Colonna. |
Ordinamento e ordinamento
| metodo | Descrizione |
|---|---|
sort(*cols, **kwargs) |
Restituisce un nuovo dataframe ordinato in base alle colonne specificate. |
orderBy(*cols, **kwargs) |
Alias per l'ordinamento. |
sortWithinPartitions(*cols, **kwargs) |
Restituisce un nuovo dataframe con ogni partizione ordinata in base alle colonne specificate. |
Aggregazione e raggruppamento
| metodo | Descrizione |
|---|---|
groupBy(*cols) |
Raggruppa il dataframe in base alle colonne specificate in modo che l'aggregazione possa essere eseguita su di esse. |
rollup(*cols) |
Creare un rollup multidimensionale per il dataframe corrente usando le colonne specificate. |
cube(*cols) |
Creare un cubo multidimensionale per il dataframe corrente usando le colonne specificate. |
groupingSets(groupingSets, *cols) |
Creare un'aggregazione multidimensionale per il dataframe corrente usando i set di raggruppamento specificati. |
agg(*exprs) |
Aggregazione sull'intero dataframe senza gruppi (abbreviato per df.groupBy().agg()). |
observe(observation, *exprs) |
Definire le metriche (denominate) da osservare nel dataframe. |
Joins
| metodo | Descrizione |
|---|---|
join(other, on, how) |
Crea un join con un altro dataframe usando l'espressione di join specificata. |
crossJoin(other) |
Restituisce il prodotto cartesiano con un altro dataframe. |
lateralJoin(other, on, how) |
Join laterali con un altro dataframe, usando l'espressione di join specificata. |
Impostare le operazioni
| metodo | Descrizione |
|---|---|
union(other) |
Restituisce un nuovo dataframe contenente l'unione di righe in questo e un altro dataframe. |
unionByName(other, allowMissingColumns) |
Restituisce un nuovo dataframe contenente l'unione di righe in questo oggetto e un altro dataframe. |
intersect(other) |
Restituisce un nuovo dataframe contenente righe solo in questo dataframe e in un altro dataframe. |
intersectAll(other) |
Restituisce un nuovo dataframe contenente righe sia in questo dataframe che in un altro dataframe mantenendo i duplicati. |
subtract(other) |
Restituisce un nuovo dataframe contenente righe in questo dataframe, ma non in un altro dataframe. |
exceptAll(other) |
Restituisce un nuovo dataframe contenente righe in questo dataframe ma non in un altro dataframe mantenendo i duplicati. |
Deduplicazione
| metodo | Descrizione |
|---|---|
distinct() |
Restituisce un nuovo dataframe contenente le righe distinte in questo dataframe. |
dropDuplicates(subset) |
Restituisce un nuovo dataframe con righe duplicate rimosse, facoltativamente considerando solo determinate colonne. |
dropDuplicatesWithinWatermark(subset) |
Restituisce un nuovo dataframe con righe duplicate rimosse, facoltativamente considerando solo determinate colonne, all'interno della filigrana. |
Campionamento e suddivisione
| metodo | Descrizione |
|---|---|
sample(withReplacement, fraction, seed) |
Restituisce un subset campionato di questo dataframe. |
sampleBy(col, fractions, seed) |
Restituisce un campione stratificato senza sostituzione in base alla frazione specificata in ogni strato. |
randomSplit(weights, seed) |
Suddivide in modo casuale questo dataframe con i pesi forniti. |
Partitioning
| metodo | Descrizione |
|---|---|
coalesce(numPartitions) |
Restituisce un nuovo dataframe con esattamente partizioni numPartitions. |
repartition(numPartitions, *cols) |
Restituisce un nuovo dataframe partizionato dalle espressioni di partizionamento indicate. |
repartitionByRange(numPartitions, *cols) |
Restituisce un nuovo dataframe partizionato dalle espressioni di partizionamento indicate. |
repartitionById(numPartitions, partitionIdCol) |
Restituisce un nuovo dataframe partizionato dall'espressione ID di partizione specificata. |
Rimodellamento
| metodo | Descrizione |
|---|---|
unpivot(ids, values, variableColumnName, valueColumnName) |
Annullare ilpivot di un dataframe da un formato wide a un formato lungo. |
melt(ids, values, variableColumnName, valueColumnName) |
Alias per unpivot. |
transpose(indexColumn) |
Trasponi un dataframe in modo che i valori nella colonna di indice specificata diventino le nuove colonne. |
Gestione dei dati mancanti
| metodo | Descrizione |
|---|---|
dropna(how, thresh, subset) |
Restituisce un nuovo dataframe che omette righe con valori Null o NaN. |
fillna(value, subset) |
Restituisce un nuovo dataframe che i valori Null vengono riempiti con un nuovo valore. |
replace(to_replace, value, subset) |
Restituisce un nuovo dataframe sostituendo un valore con un altro valore. |
Funzioni statistiche
| metodo | Descrizione |
|---|---|
approxQuantile(col, probabilities, relativeError) |
Calcola i quantili approssimativi delle colonne numeriche di un dataframe. |
corr(col1, col2, method) |
Calcola la correlazione di due colonne di un dataframe come valore doppio. |
cov(col1, col2) |
Calcolare la covarianza di esempio per le colonne specificate, specificate dai relativi nomi. |
crosstab(col1, col2) |
Calcola una tabella di frequenza a coppie delle colonne specificate. |
freqItems(cols, support) |
Ricerca di elementi frequenti per le colonne, possibilmente con falsi positivi. |
Operazioni dello schema
| metodo | Descrizione |
|---|---|
to(schema) |
Restituisce un nuovo dataframe in cui ogni riga viene riconciliata in modo che corrisponda allo schema specificato. |
alias(alias) |
Restituisce un nuovo dataframe con un set di alias. |
Iterazione
| metodo | Descrizione |
|---|---|
foreach(f) |
Applica la funzione f a tutte le righe di questo dataframe. |
foreachPartition(f) |
Applica la funzione f a ogni partizione di questo dataframe. |
Memorizzazione nella cache e persistenza
| metodo | Descrizione |
|---|---|
cache() |
Rende persistente il dataframe con il livello di archiviazione predefinito (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Imposta il livello di archiviazione per rendere persistente il contenuto del dataframe tra le operazioni. |
unpersist(blocking) |
Contrassegna il dataframe come non persistente e rimuove tutti i blocchi per esso dalla memoria e dal disco. |
Checkpoint
| metodo | Descrizione |
|---|---|
checkpoint(eager) |
Restituisce una versione con checkpoint di questo dataframe. |
localCheckpoint(eager, storageLevel) |
Restituisce una versione con checkpoint locale di questo dataframe. |
Operazioni di streaming
| metodo | Descrizione |
|---|---|
withWatermark(eventTime, delayThreshold) |
Definisce una filigrana dell'ora dell'evento per questo dataframe. |
Hint di ottimizzazione
| metodo | Descrizione |
|---|---|
hint(name, *parameters) |
Specifica un suggerimento sul dataframe corrente. |
Limiti e offset
| metodo | Descrizione |
|---|---|
limit(num) |
Limita il conteggio dei risultati al numero specificato. |
offset(num) |
Restituisce un nuovo dataframe ignorando le prime n righe. |
Trasformazioni avanzate
| metodo | Descrizione |
|---|---|
transform(func, *args, **kwargs) |
Restituisce un nuovo dataframe. Sintassi concisa per concatenare trasformazioni personalizzate. |
Metodi di conversione
| metodo | Descrizione |
|---|---|
toPandas() |
Restituisce il contenuto di questo dataframe come Pandas pandas. DataFrame. |
toArrow() |
Restituisce il contenuto di questo dataframe come pyArrow pyarrow. Tavolo. |
pandas_api(index_col) |
Converte il dataframe esistente in un dataframe pandas-on-Spark. |
mapInPandas(func, schema, barrier, profile) |
Esegue il mapping di un iteratore di batch nel dataframe corrente usando una funzione nativa Python. |
mapInArrow(func, schema, barrier, profile) |
Esegue il mapping di un iteratore di batch nel dataframe corrente usando una funzione nativa Python eseguita su pyarrow. RecordBatch. |
Scrittura dei dati
| metodo | Descrizione |
|---|---|
writeTo(table) |
Creare un generatore di configurazione di scrittura per le origini v2. |
mergeInto(table, condition) |
Unisce un set di aggiornamenti, inserimenti ed eliminazioni in base a una tabella di origine in una tabella di destinazione. |
Confronto tra dataframe
| metodo | Descrizione |
|---|---|
sameSemantics(other) |
Restituisce True quando i piani di query logici all'interno di entrambi i dataframe sono uguali. |
semanticHash() |
Restituisce un codice hash del piano di query logico su questo dataframe. |
Metadati e informazioni sui file
| metodo | Descrizione |
|---|---|
inputFiles() |
Restituisce uno snapshot ottimale dei file che compongono questo dataframe. |
Funzionalità avanzate di SQL
| metodo | Descrizione |
|---|---|
isLocal() |
Restituisce True se i metodi collect e take possono essere eseguiti localmente. |
asTable() |
Converte il dataframe in un oggetto TableArg, che può essere utilizzato come argomento di tabella in un oggetto TVF. |
scalar() |
Restituisce un oggetto Column per una sottoquery SCALAR contenente esattamente una riga e una colonna. |
exists() |
Restituisce un oggetto Column per una sottoquery EXISTS. |
Examples
Operazioni di base sul dataframe
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Alice", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Bob", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Sue", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Tom", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Aggregazione e raggruppamento
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
Trasformazioni complesse
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()