Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können den COPY INTO SQL-Befehl verwenden, um Daten aus einem Dateispeicherort in eine Delta-Tabelle zu laden.
COPY INTO ist wiederholbar und idempotent – Dateien am Quellspeicherort, die bereits geladen wurden, werden bei späteren Ausführungen übersprungen.
COPY INTO bietet folgende Funktionen:
- Einfache konfigurierbare Datei- oder Ordnerfilter aus dem Cloudspeicher, einschließlich S3, ADLS, ABFS, GCS und Unity Catalog Volumes.
- Unterstützung für mehrere Quelldateiformate: CSV, JSON, XML, Avro, ORC, Parkett, Text und Binärdateien.
- Genau einmal (idempotent) Dateiverarbeitung standardmäßig.
- Zieltabellen-Schemaableitung, Zuordnung, Zusammenführung und Weiterentwicklung.
Hinweis
Für eine skalierbarere und robustere Dateiübernahme empfiehlt Databricks, dass SQL-Benutzer Streamingtabellen verwenden. Weitere Informationen finden Sie unter Streamingtabellen.
Warnung
COPY INTO berücksichtigt die Arbeitsbereichseinstellung für Löschvektoren. Wenn diese Option aktiviert ist, werden Löschvektoren in der Zieltabelle aktiviert, wenn COPY INTO in einem SQL-Warehouse oder einer Compute-Umgebung mit Databricks Runtime 14.0 oder höher ausgeführt wird. Nachdem Löschvektoren aktiviert wurden, blockieren sie Abfragen für eine Tabelle in Databricks Runtime 11.3 LTS und darunter. Siehe Löschvektoren in Databricks und automatisch aktivierte Löschvektoren.
Bevor Sie anfangen
Ein Kontoadministrator muss die Schritte unter Konfigurieren des Datenzugriffs für die Aufnahme ausführen, um den Zugriff auf Daten im Cloudobjektspeicher zu konfigurieren, bevor Benutzer Daten mithilfe COPY INTOladen können.
Laden von Daten in eine schemalose Delta Lake-Tabelle
In Databricks Runtime 11.3 LTS und höher können Sie leere Platzhalter-Delta-Tabellen erstellen, indem Sie die COPY INTO Einstellung in mergeSchema auf true setzen, sodass das Schema während eines COPY_OPTIONS Befehls abgeleitet wird. Im folgenden Beispiel wird der Wanderbricks-Datensatz verwendet. Ersetzen Sie <catalog>, <schema> und <volume> durch einen Katalog, ein Schema und ein Volume, in denen Sie CREATE TABLE Berechtigungen haben.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Diese SQL-Anweisung ist idempotent. Dies bedeutet, dass Sie die Ausführung so planen können, dass sie wiederholt ausgeführt wird, und sie lädt nur neue Daten in Ihre Delta-Tabelle.
Hinweis
Die leere Delta-Tabelle kann außerhalb COPY INTOnicht verwendet werden.
INSERT INTO und MERGE INTO werden nicht unterstützt, Daten in schemalose Delta-Tabellen zu schreiben. Nachdem Daten mit COPY INTO in die Tabelle eingefügt wurden, wird die Tabelle abfragbar.
Siehe Erstellen von Zieltabellen für COPY INTO.
Festlegen des Schemas und Laden von Daten in eine Delta Lake-Tabelle
Im folgenden Beispiel wird eine Delta-Tabelle erstellt und der COPY INTO SQL-Befehl verwendet, um Beispieldaten aus dem Wanderbricks-Dataset in die Tabelle zu laden. Die Quelldateien sind JSON-Dateien, die in einem Unity-Katalogvolume gespeichert sind. Sie können das Beispiel Python, R, Scala oder SQL-Code aus einem Notizbuch ausführen, das an einen Azure Databricks Cluster angefügt ist. Sie können den SQL-Code auch aus einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist. Ersetzen Sie <catalog>, <schema> und <volume> durch einen Katalog, ein Schema und ein Volume, in denen Sie CREATE TABLE Berechtigungen haben.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Führen Sie zum Bereinigen den folgenden Code aus, um die Beispieltabelle zu löschen.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
Bereinigen von Metadatendateien
Sie können VACUUM ausführen, um nicht referenzierte Metadatendateien zu bereinigen, die von COPY INTO in Databricks Runtime 15.2 und höher erstellt wurden.
Weitere Ressourcen
- Laden von Daten mithilfe von COPY INTO mit Unity-Katalogvolumes oder externen Speicherorten
-
Allgemeine Datenlademuster mit
COPY INTO.
- Databricks Runtime 7.x und höher:
COPY INTO