pandas DataFrames に対して入力と出力の両方として実行され、結果を DataFrame として返すPythonネイティブ関数を使用して、現在の DataFrame 内のバッチの反復子をマップします。
構文
mapInPandas(func: "PandasMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None)
パラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
func |
関数 |
pandas.DataFrames の反復子を受け取り、pandas.DataFrame の反復子を出力するPythonネイティブ関数。 |
schema |
DataType または str | PySpark の func の戻り値の型。 値には、 pyspark.sql.types.DataType オブジェクトまたは DDL 形式の型文字列のいずれかを指定できます。 |
barrier |
bool、省略可能、既定の False | バリア モードの実行を使用して、ステージ内のすべてのPythonワーカーが同時に起動されるようにします。 |
profile |
ResourceProfile、省略可能 | mapInPandas に使用する省略可能な ResourceProfile。 |
返品
DataFrame
例示
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
def mean_age(iterator):
for pdf in iterator:
yield pdf.groupby("id").mean().reset_index()
df.mapInPandas(mean_age, "id: bigint, age: double").show()
# +---+----+
# | id| age|
# +---+----+
# | 1|21.0|
# | 2|30.0|
# +---+----+
df.mapInPandas(filter_func, df.schema, barrier=True).collect()
# [Row(id=1, age=21)]