register (UDFRegistration)

Registriert eine Python-Funktion (einschließlich Lambda-Funktionen) oder eine benutzerdefinierte Funktion als SQL-Funktion.

Syntax

register(name, f, returnType=None)

Parameter

Parameter Typ Beschreibung
name str Name der benutzerdefinierten Funktion in SQL-Anweisungen.
f funktion, , udfoder pandas_udf Eine Python-Funktion oder eine benutzerdefinierte Funktion. Die benutzerdefinierte Funktion kann entweder zeilenweise oder vektorisiert werden.
returnType Datentyp oder str, optional Der Rückgabetyp der registrierten benutzerdefinierten Funktion. Dabei kann es sich um ein DataType Objekt oder eine DDL-formatierte Typzeichenfolge handeln. Nur gültig, wenn f eine einfache Python-Funktion ist, nicht wenn f bereits eine benutzerdefinierte Funktion ist.

Rückkehr

Funktion

Hinweise

Um eine nicht deterministische Python-Funktion zu registrieren, erstellen Sie zuerst eine nichtdeterministische benutzerdefinierte Funktion für die Python-Funktion, und registrieren Sie sie dann als SQL-Funktion.

Beispiele

# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]

spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]

# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]

# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]

# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)

# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]

# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
spark.sql(
    "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]