Freigeben über


withWatermark

Definiert ein Ereigniszeitwasserzeichen für diesen DataFrame. Ein Wasserzeichen verfolgt einen Zeitpunkt, vor dem wir davon ausgehen, dass keine verspäteten Daten ankommen werden.

Syntax

withWatermark(eventTime: str, delayThreshold: str)

Parameter

Parameter Typ Beschreibung
eventTime str der Name der Spalte, die die Ereigniszeit der Zeile enthält.
delayThreshold str die minimale Verzögerung, bis daten spät eintreffen, relativ zum neuesten Datensatz, der in Form eines Intervalls verarbeitet wurde (z. B. "1 Minute" oder "5 Stunden").

Rückkehr

DataFrame: Wasserzeichen-DataFrame.

Hinweise

Dies ist nur für strukturiertes Streaming ein Feature.

Spark verwendet dieses Wasserzeichen für mehrere Zwecke:

  • Um zu wissen, wann eine bestimmte Zeitfensteraggregation abgeschlossen werden kann und somit ausgegeben werden kann, wenn Ausgabemodi verwendet werden, die keine Updates zulassen.
  • Um den Zustand zu minimieren, den wir für laufende Aggregationen beibehalten müssen.

Das aktuelle Wasserzeichen wird berechnet, indem MAX(eventTime) alle Partitionen in der Abfrage angezeigt werden, abzüglich eines vom Benutzer angegebenen delayThresholdBenutzers. Aufgrund der Kosten für die Koordination dieses Werts über Partitionen hinweg ist das verwendete Wasserzeichen nur garantiert mindestens delayThreshold hinter der tatsächlichen Ereigniszeit.

Beispiele

from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
    "value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]