Freigeben über


Lernprogramm: COPY INTO mit Spark SQL

Databricks empfiehlt, den COPY INTO Befehl für das inkrementelle Laden von Daten und massenweises Laden von Daten für Datenquellen zu verwenden, die Tausende von Dateien enthalten.

In diesem Lernprogramm verwenden Sie den Befehl COPY INTO, um JSON-Daten aus einem Unity-Katalogvolume in eine Delta-Tabelle in Ihrem Azure Databricks Arbeitsbereich zu laden. Sie verwenden das Wanderbricks-Beispiel-Dataset als Datenquelle. Weitere fortgeschrittene Ingestion-Anwendungsfälle finden Sie unter Was ist Auto Loader?

Anforderungen

Schritt 1: Konfigurieren Ihrer Umgebung

Der Code in diesem Lernprogramm verwendet ein Unity Catalog-Volume zum Speichern von JSON-Quelldateien. Ersetzen Sie <catalog> durch einen Katalog, bei dem Sie über die Berechtigungen CREATE SCHEMA und CREATE VOLUME verfügen. Wenn Sie den Code nicht ausführen können, wenden Sie sich an den Arbeitsbereichsadministrator.

Erstellen Sie ein Notizbuch , und fügen Sie es an eine Computeressource an. Führen Sie dann den folgenden Code aus, um ein Schema und ein Volume für dieses Lernprogramm einzurichten.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Schritt 2: Schreiben von Beispieldaten in das Volume als JSON

Der COPY INTO Befehl lädt Daten aus dateibasierten Quellen. Lesen Sie aus der Wanderbricks-Beispieltabellebookings und schreiben Sie eine Reihe von Datensätzen als JSON-Dateien in Ihr Volume, wobei Daten simuliert werden, die von einem externen System empfangen werden.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Das Schreiben von Dateien in ein Volume erfordert Python. In einem realen Workflow würden diese Daten aus einem externen System stammen.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Schritt 3: Verwenden Sie COPY INTO zum idempotenten Laden von JSON-Daten.

Erstellen Sie eine Ziel-Delta-Tabelle, bevor Sie dies verwenden COPY INTO. Sie müssen nichts anderes als einen Tabellennamen in Ihrer CREATE TABLE Anweisung angeben. Da diese Aktion idempotent ist, lädt Databricks die Daten nur einmal, auch wenn Sie den Code mehrmals ausführen.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Schritt 4: Anzeigen einer Vorschau des Inhalts Ihrer Tabelle

Stellen Sie sicher, dass die Tabelle 20 Zeilen aus dem ersten Batch von Wanderbricks-Bookings-Daten enthält und dass das Schema ordnungsgemäß aus den JSON-Quelldateien abgeleitet wurde.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Schritt 5: Laden weiterer Daten und Vorschauergebnisse

Sie können zusätzliche Daten simulieren, die von einem externen System empfangen werden, indem Sie einen weiteren Batch von Datensätzen schreiben und COPY INTO erneut ausführen. Führen Sie den folgenden Code aus, um einen zweiten Datenbatch zu schreiben.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Das Schreiben von Dateien in ein Volume erfordert Python. In einem realen Workflow würden diese Daten aus einem externen System stammen.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Führen Sie dann den COPY INTO Befehl aus Schritt 3 erneut aus, und zeigen Sie eine Vorschau der Tabelle an, um die neuen Datensätze zu bestätigen. Nur die neuen Dateien werden geladen.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Schritt 6: Tutorial bereinigen

Wenn Sie mit diesem Tutorial fertig sind, können Sie die zugehörigen Ressourcen aufräumen, wenn Sie sie nicht mehr behalten möchten. Legen Sie das Schema, die Tabellen und das Volume ab, und entfernen Sie alle Daten.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Weitere Ressourcen