Condividi tramite


withWatermark

Definisce una filigrana dell'ora dell'evento per questo dataframe. Una filigrana tiene traccia di un punto nel tempo prima del quale si presuppone che non arrivino più dati in ritardo.

Sintassi

withWatermark(eventTime: str, delayThreshold: str)

Parametri

Parametro Tipo Descrizione
eventTime str nome della colonna contenente l'ora dell'evento della riga.
delayThreshold str ritardo minimo di attesa per l'arrivo dei dati in ritardo, rispetto al record più recente elaborato sotto forma di intervallo (ad esempio "1 minuto" o "5 ore").

Restituzioni

DataFrame: dataframe con filigrana.

Note

Questa è una funzionalità solo per Structured Streaming.

Spark userà questa filigrana per diversi scopi:

  • Per sapere quando è possibile finalizzare una determinata aggregazione dell'intervallo di tempo e quindi può essere generata quando si usano le modalità di output che non consentono gli aggiornamenti.
  • Per ridurre al minimo la quantità di stato che è necessario mantenere per le aggregazioni in corso.

La filigrana corrente viene calcolata esaminando l'oggetto MAX(eventTime) visualizzato in tutte le partizioni nella query meno un utente specificato delayThreshold. A causa del costo di coordinamento di questo valore tra le partizioni, la filigrana effettiva usata è garantita solo per essere almeno delayThreshold dietro l'ora effettiva dell'evento.

Examples

from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
    "value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]