Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Classe di base per i writer dell'origine dati.
I writer dell'origine dati sono responsabili del salvataggio dei dati in un'origine dati. Implementare questa classe e restituire un'istanza da DataSource.writer() per rendere scrivibile un'origine dati.
Aggiunta in Databricks Runtime 14.3 LTS
Sintassi
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Methods
| metodo | Descrizione |
|---|---|
write(iterator) |
Scrive i dati nell'origine dati. Chiamato una volta su ogni executor. Accetta un iteratore di Row oggetti e restituisce un WriterCommitMessageoggetto o None se non è presente alcun messaggio di commit. Questo metodo è astratto e deve essere implementato. |
commit(messages) |
Esegue il commit del processo di scrittura usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando tutte le attività vengono eseguite correttamente. |
abort(messages) |
Interrompe il processo di scrittura usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando una o più attività non sono riuscite. |
Note
- Il driver raccoglie i messaggi di commit da tutti gli executor e li passa a
commit()se tutte le attività hanno esito positivo o aabort()se un'attività ha esito negativo. - Se un'attività di scrittura non riesce, il messaggio di commit sarà
Noneincluso nell'elenco passato acommit()oabort().
Examples
Implementare un writer di base che salva le righe in un file:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")