DataSourceWriter

Uma classe base para escritores de fontes de dados.

Os escritores de fontes de dados são responsáveis por guardar dados numa fonte de dados. Implemente esta classe e retorne uma instância de DataSource.writer() para tornar uma fonte de dados gravável.

Adicionado no Databricks Runtime 14.3 LTS

Sintaxe

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Método Descrição
write(iterator) Escreve dados na fonte de dados. Chamado uma vez a cada executor. Aceita um iterador de Row objetos e devolve um WriterCommitMessage, ou None se não houver mensagem de commit. Este método é abstrato e deve ser implementado.
commit(messages) Faz commit na tarefa de escrita usando uma lista de mensagens de commit recolhidas de todos os executores. Invocado no driver quando todas as tarefas correm com sucesso.
abort(messages) Aborta o trabalho de escrita usando uma lista de mensagens de commit recolhidas de todos os executores. Invocado no driver quando uma ou mais tarefas falhavam.

Notes

  • O driver recolhe mensagens de commit de todos os executores e passa-as se commit() todas as tarefas têm sucesso, ou se abort() alguma falha.
  • Se uma tarefa de escrita falhar, a sua mensagem de commit estará None na lista passada para commit() ou abort().

Exemplos

Implemente um escritor básico que guarde linhas num ficheiro:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")