Freigeben über


DataSourceWriter

Eine Basisklasse für Datenquellenautoren.

Datenquellenautoren sind für das Speichern von Daten in einer Datenquelle verantwortlich. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.writer() , um eine Datenquelle schreibbar zu machen.

Hinzugefügt in Databricks Runtime 14.3 LTS

Syntax

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methodik

Methode Beschreibung
write(iterator) Schreibt Daten in die Datenquelle. Wird einmal für jeden Executor aufgerufen. Akzeptiert einen Iterator von Row Objekten und gibt eine WriterCommitMessageOder None , wenn keine Commit-Nachricht vorhanden ist. Diese Methode ist abstrakt und muss implementiert werden.
commit(messages) Führt einen Commit für den Schreibauftrag mithilfe einer Liste von Commitnachrichten durch, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn alle Aufgaben erfolgreich ausgeführt werden.
abort(messages) Bricht den Schreibauftrag mithilfe einer Liste von Commit-Nachrichten ab, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn mindestens eine Aufgabe fehlgeschlagen ist.

Hinweise

  • Der Treiber sammelt Commit-Nachrichten von allen Executoren und übergibt sie an commit() , wenn alle Aufgaben erfolgreich sind oder abort() wenn eine Aufgabe fehlschlägt.
  • Wenn eine Schreibaufgabe fehlschlägt, befindet None sich die Commit-Nachricht in der Liste, die an commit() oder abort().

Beispiele

Implementieren Sie einen einfachen Writer, der Zeilen in einer Datei speichert:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")