Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Databricks empfiehlt, den COPY INTO Befehl für das inkrementelle Laden von Daten und massenweises Laden von Daten für Datenquellen zu verwenden, die Tausende von Dateien enthalten.
In diesem Lernprogramm verwenden Sie den Befehl COPY INTO, um JSON-Daten aus einem Unity-Katalogvolume in eine Delta-Tabelle in Ihrem Azure Databricks Arbeitsbereich zu laden. Sie verwenden das Wanderbricks-Beispiel-Dataset als Datenquelle. Weitere fortgeschrittene Ingestion-Anwendungsfälle finden Sie unter Was ist Auto Loader?
Anforderungen
- Zugriff auf eine Computeressource. Siehe Compute.
- Ein Arbeitsbereich, der mit Unity-Katalog aktiviert ist und über Berechtigungen zum Erstellen von Schemata und Volumes in einem Katalog verfügt. Siehe Verbinden mit Cloudobjektspeicher mithilfe des Unity-Katalogs.
Schritt 1: Konfigurieren Ihrer Umgebung
Der Code in diesem Lernprogramm verwendet ein Unity Catalog-Volume zum Speichern von JSON-Quelldateien. Ersetzen Sie <catalog> durch einen Katalog, bei dem Sie über die Berechtigungen CREATE SCHEMA und CREATE VOLUME verfügen. Wenn Sie den Code nicht ausführen können, wenden Sie sich an den Arbeitsbereichsadministrator.
Erstellen Sie ein Notizbuch , und fügen Sie es an eine Computeressource an. Führen Sie dann den folgenden Code aus, um ein Schema und ein Volume für dieses Lernprogramm einzurichten.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Schritt 2: Schreiben von Beispieldaten in das Volume als JSON
Der COPY INTO Befehl lädt Daten aus dateibasierten Quellen. Lesen Sie aus der Wanderbricks-Beispieltabellebookings und schreiben Sie eine Reihe von Datensätzen als JSON-Dateien in Ihr Volume, wobei Daten simuliert werden, die von einem externen System empfangen werden.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Das Schreiben von Dateien in ein Volume erfordert Python. In einem realen Workflow würden diese Daten aus einem externen System stammen.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Schritt 3: Verwenden Sie COPY INTO zum idempotenten Laden von JSON-Daten.
Erstellen Sie eine Ziel-Delta-Tabelle, bevor Sie dies verwenden COPY INTO. Sie müssen nichts anderes als einen Tabellennamen in Ihrer CREATE TABLE Anweisung angeben. Da diese Aktion idempotent ist, lädt Databricks die Daten nur einmal, auch wenn Sie den Code mehrmals ausführen.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Schritt 4: Anzeigen einer Vorschau des Inhalts Ihrer Tabelle
Stellen Sie sicher, dass die Tabelle 20 Zeilen aus dem ersten Batch von Wanderbricks-Bookings-Daten enthält und dass das Schema ordnungsgemäß aus den JSON-Quelldateien abgeleitet wurde.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Schritt 5: Laden weiterer Daten und Vorschauergebnisse
Sie können zusätzliche Daten simulieren, die von einem externen System empfangen werden, indem Sie einen weiteren Batch von Datensätzen schreiben und COPY INTO erneut ausführen. Führen Sie den folgenden Code aus, um einen zweiten Datenbatch zu schreiben.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Das Schreiben von Dateien in ein Volume erfordert Python. In einem realen Workflow würden diese Daten aus einem externen System stammen.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Führen Sie dann den COPY INTO Befehl aus Schritt 3 erneut aus, und zeigen Sie eine Vorschau der Tabelle an, um die neuen Datensätze zu bestätigen. Nur die neuen Dateien werden geladen.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Schritt 6: Tutorial bereinigen
Wenn Sie mit diesem Tutorial fertig sind, können Sie die zugehörigen Ressourcen aufräumen, wenn Sie sie nicht mehr behalten möchten. Legen Sie das Schema, die Tabellen und das Volume ab, und entfernen Sie alle Daten.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;