Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gibt den neuesten Offset zurück, der aufgrund eines Leselimits verfügbar ist.
Der start Offset kann verwendet werden, um zu bestimmen, wie viel neue Daten aufgrund des Grenzwerts gelesen werden sollen. Für den ersten Mikrobatch start wird aus dem Rückgabewert von initialOffset(). Für nachfolgende Mikrobatches geht es von der letzten Mikrobatch fort. Die Quelle kann den gleichen Offset wie der Startoffset zurückgeben, wenn keine Zu verarbeitenden Daten vorhanden sind.
ReadLimit kann von der Quelle verwendet werden, um die zurückgegebene Datenmenge einzuschränken. Implementieren Sie getDefaultReadLimit() dies, um die richtigen ReadLimit bereitzustellen, wenn die Quelle Daten basierend auf Den Quelloptionen einschränken kann.
Das Modul kann weiterhin mit latestOffset() anrufenReadAllAvailable, auch wenn die Quelle einen anderen Lesegrenzwert erzeugt als getDefaultReadLimit(). Die Quelle muss stets die vom Modul bereitgestellte Angabe ReadLimit respektieren.
Hinzugefügt in Databricks Runtime 15.2
Syntax
latestOffset(start: dict, limit: ReadLimit)
Parameter
| Parameter | Typ | Beschreibung |
|---|---|---|
start |
Wörterbuch | Der Anfangsoffset des Mikrobatchs, von dem aus weiter gelesen werden soll. |
limit |
ReadLimit | Der Grenzwert für die Datenmenge, die von diesem Aufruf zurückgegeben werden soll. |
Rückkehr
dict
Ein Diktat oder rekursives Diktat, dessen Schlüssel und Wert grundtyptyp sind, einschließlich Integer, String und Boolean.
Beispiele
from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadMaxRows
def latestOffset(self, start, limit):
# Assume the source has 10 new records between start and latest offset
if isinstance(limit, ReadAllAvailable):
return {"index": start["index"] + 10}
else: # e.g., limit is ReadMaxRows(5)
return {"index": start["index"] + min(10, limit.maxRows)}