DataSourceWriter

Uma classe base para gravadores de fonte de dados.

Os gravadores de fonte de dados são responsáveis por salvar dados em uma fonte de dados. Implemente essa classe e retorne uma instância DataSource.writer() para tornar uma fonte de dados gravável.

Adicionado no Databricks Runtime 14.3 LTS

Sintaxe

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Método Descrição
write(iterator) Grava dados na fonte de dados. Chamado uma vez em cada executor. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não há nenhuma mensagem de confirmação. Esse método é abstrato e deve ser implementado.
commit(messages) Confirma o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando todas as tarefas são executadas com êxito.
abort(messages) Anula o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas falharam.

Observações

  • O driver coleta mensagens de confirmação de todos os executores e as passa para commit() se todas as tarefas tiverem êxito ou se abort() alguma tarefa falhar.
  • Se uma tarefa de gravação falhar, sua mensagem de confirmação estará None na lista passada para commit() ou abort().

Exemplos

Implemente um gravador básico que salva linhas em um arquivo:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")