DataSourceWriter

データ ソース ライターの基本クラス。

データ ソース ライターは、データ ソースにデータを保存する役割を担います。 このクラスを実装し、 DataSource.writer() からインスタンスを返して、データ ソースを書き込み可能にします。

Databricks Runtime 14.3 LTS に追加されました

構文

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

メソッド

メソッド 説明
write(iterator) データ ソースにデータを書き込みます。 各 Executor で 1 回呼び出されます。 Row オブジェクトの反復子を受け取り、WriterCommitMessageを返します。コミット メッセージがない場合はNoneします。 このメソッドは抽象メソッドであり、実装する必要があります。
commit(messages) すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブをコミットします。 すべてのタスクが正常に実行されたときにドライバーで呼び出されます。
abort(messages) すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブを中止します。 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。

メモ

  • ドライバーは、すべての Executor からコミット メッセージを収集し、すべてのタスクが成功した場合は commit() に渡すか、タスクが失敗した場合に abort() します。
  • 書き込みタスクが失敗した場合、そのコミット メッセージは、Noneまたはcommit()に渡された一覧にabort()されます。

例示

ファイルに行を保存する基本的なライターを実装します。

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")