データ ソース ライターの基本クラス。
データ ソース ライターは、データ ソースにデータを保存する役割を担います。 このクラスを実装し、 DataSource.writer() からインスタンスを返して、データ ソースを書き込み可能にします。
Databricks Runtime 14.3 LTS に追加されました
構文
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
メソッド
| メソッド | 説明 |
|---|---|
write(iterator) |
データ ソースにデータを書き込みます。 各 Executor で 1 回呼び出されます。
Row オブジェクトの反復子を受け取り、WriterCommitMessageを返します。コミット メッセージがない場合はNoneします。 このメソッドは抽象メソッドであり、実装する必要があります。 |
commit(messages) |
すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブをコミットします。 すべてのタスクが正常に実行されたときにドライバーで呼び出されます。 |
abort(messages) |
すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブを中止します。 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。 |
メモ
- ドライバーは、すべての Executor からコミット メッセージを収集し、すべてのタスクが成功した場合は
commit()に渡すか、タスクが失敗した場合にabort()します。 - 書き込みタスクが失敗した場合、そのコミット メッセージは、
Noneまたはcommit()に渡された一覧にabort()されます。
例示
ファイルに行を保存する基本的なライターを実装します。
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")