Freigeben über


mapInPandas

Ordnet einen Iterator von Batches im aktuellen DataFrame mithilfe einer Python nativen Funktion zu, die für Pandas DataFrames sowohl als Eingabe als auch als Ausgabe ausgeführt wird, und gibt das Ergebnis als DataFrame zurück.

Syntax

mapInPandas(func: "PandasMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None)

Parameter

Parameter Typ Beschreibung
func Funktion eine Python systemeigene Funktion, die einen Iterator von pandas.DataFrames verwendet, und gibt einen Iterator von pandas.DataFrames aus.
schema Datentyp oder str der Rückgabetyp des func PySpark. Der Wert kann ein pyspark.sql.types.DataType Objekt oder eine DDL-formatierte Typzeichenfolge sein.
barrier bool, optional, default False Verwenden Sie die Ausführung des Barrierenmodus, um sicherzustellen, dass alle Python Worker in der Phase gleichzeitig gestartet werden.
profile ResourceProfile, optional Das optionale ResourceProfile,das für mapInPandas verwendet werden soll.

Rückkehr

DataFrame

Beispiele

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

def mean_age(iterator):
    for pdf in iterator:
        yield pdf.groupby("id").mean().reset_index()

df.mapInPandas(mean_age, "id: bigint, age: double").show()
# +---+----+
# | id| age|
# +---+----+
# |  1|21.0|
# |  2|30.0|
# +---+----+

df.mapInPandas(filter_func, df.schema, barrier=True).collect()
# [Row(id=1, age=21)]