Condividi tramite


dropDuplicatesWithinWatermark (elimina duplicati all'interno del watermark)

Restituisce un nuovo dataframe con righe duplicate rimosse, facoltativamente considerando solo determinate colonne, all'interno della filigrana.

Sintassi

dropDuplicatesWithinWatermark(subset: Optional[List[str]] = None)

Parametri

Parametro Tipo Descrizione
subset Elenco di nomi di colonna, facoltativo Elenco di colonne da usare per il confronto duplicato (tutte le colonne predefinite).

Restituzioni

DataFrame: dataframe senza duplicati.

Note

Questa operazione funziona solo con il dataframe di streaming e la filigrana per il dataframe di input deve essere impostata tramite withWatermark.

Per un dataframe di streaming, tutti i dati verranno mantenuto tra i trigger come stato intermedio per eliminare le righe duplicate. Lo stato verrà mantenuto per garantire la semantica, "Gli eventi vengono deduplicati fino a quando la distanza temporale degli eventi meno recenti e più recenti è inferiore alla soglia di ritardo della filigrana". Gli utenti sono invitati a impostare la soglia di ritardo della filigrana più lunga rispetto al numero massimo di differenze di timestamp tra gli eventi duplicati.

Nota: i dati troppo tardi più vecchi della filigrana verranno eliminati.

Supporta Spark Connect.

Examples

from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
    "value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]

df.dropDuplicatesWithinWatermark()

df.dropDuplicatesWithinWatermark(['value'])