Condividi tramite


DataSourceWriter

Classe di base per i writer dell'origine dati.

I writer dell'origine dati sono responsabili del salvataggio dei dati in un'origine dati. Implementare questa classe e restituire un'istanza da DataSource.writer() per rendere scrivibile un'origine dati.

Aggiunta in Databricks Runtime 14.3 LTS

Sintassi

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

metodo Descrizione
write(iterator) Scrive i dati nell'origine dati. Chiamato una volta su ogni executor. Accetta un iteratore di Row oggetti e restituisce un WriterCommitMessageoggetto o None se non è presente alcun messaggio di commit. Questo metodo è astratto e deve essere implementato.
commit(messages) Esegue il commit del processo di scrittura usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando tutte le attività vengono eseguite correttamente.
abort(messages) Interrompe il processo di scrittura usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando una o più attività non sono riuscite.

Note

  • Il driver raccoglie i messaggi di commit da tutti gli executor e li passa a commit() se tutte le attività hanno esito positivo o a abort() se un'attività ha esito negativo.
  • Se un'attività di scrittura non riesce, il messaggio di commit sarà None incluso nell'elenco passato a commit() o abort().

Examples

Implementare un writer di base che salva le righe in un file:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")