Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Uma classe base para escritores de fontes de dados.
Os escritores de fontes de dados são responsáveis por guardar dados numa fonte de dados. Implemente esta classe e retorne uma instância de DataSource.writer() para tornar uma fonte de dados gravável.
Adicionado no Databricks Runtime 14.3 LTS
Sintaxe
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Methods
| Método | Descrição |
|---|---|
write(iterator) |
Escreve dados na fonte de dados. Chamado uma vez a cada executor. Aceita um iterador de Row objetos e devolve um WriterCommitMessage, ou None se não houver mensagem de commit. Este método é abstrato e deve ser implementado. |
commit(messages) |
Faz commit na tarefa de escrita usando uma lista de mensagens de commit recolhidas de todos os executores. Invocado no driver quando todas as tarefas correm com sucesso. |
abort(messages) |
Aborta o trabalho de escrita usando uma lista de mensagens de commit recolhidas de todos os executores. Invocado no driver quando uma ou mais tarefas falhavam. |
Notes
- O driver recolhe mensagens de commit de todos os executores e passa-as se
commit()todas as tarefas têm sucesso, ou seabort()alguma falha. - Se uma tarefa de escrita falhar, a sua mensagem de commit estará
Nonena lista passada paracommit()ouabort().
Exemplos
Implemente um escritor básico que guarde linhas num ficheiro:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")