Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile usare il COPY INTO comando SQL per caricare i dati da un percorso di file in una tabella Delta.
COPY INTO è retriable e idempotente: i file nel percorso di origine che sono già stati caricati vengono ignorati nelle esecuzioni successive.
COPY INTO offre queste funzionalità:
- Filtri di file o cartelle facilmente configurabili dall'archiviazione cloud, inclusi volumi S3, ADLS, ABFS, GCS e Unity Catalog.
- Supporto per più formati di file di origine: CSV, JSON, XML, Avro, ORC, Parquet, text e file binari.
- Per impostazione predefinita, il processamento idempotente dei file (una sola volta).
- Inferenza dello schema della tabella di destinazione, mappatura, fusione ed evoluzione.
Nota
Per un'esperienza di inserimento di file più scalabile e affidabile, Databricks consiglia agli utenti SQL di usare tabelle di streaming. Per altre informazioni, vedere Tabelle di streaming.
Avviso
COPY INTO rispetta la configurazione dello spazio di lavoro per i vettori di eliminazione. Se abilitati, i vettori di eliminazione vengono abilitati nella tabella di destinazione quando COPY INTO viene eseguito su un magazzino SQL o su un sistema che esegue Databricks Runtime 14.0 o versione successiva. Dopo aver abilitato i vettori di eliminazione, bloccano le query su una tabella in Databricks Runtime 11.3 LTS e versioni successive. Vedere Vettori di eliminazione in Databricks e Attiva automaticamente i vettori di eliminazione.
Prima di iniziare
Un amministratore account deve seguire la procedura descritta in Configurare l'accesso ai dati per l'inserimento per configurare l'accesso ai dati nell'archiviazione di oggetti cloud prima che gli utenti possano caricare i dati usando COPY INTO.
Caricare dati in una tabella Delta Lake senza schema
In Databricks Runtime 11.3 LTS e versioni successive è possibile creare tabelle Delta segnaposto vuote in modo che lo schema venga dedotto durante un COPY INTO comando impostando mergeSchema su true in COPY_OPTIONS. Nell'esempio seguente viene usato il set di dati Wanderbricks . Sostituire <catalog>, <schema>, e <volume> con un catalogo, uno schema e un volume in cui si disponga delle autorizzazioni CREATE TABLE.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Linguaggio di programmazione Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Questa istruzione SQL è idempotente. Ciò significa che è possibile pianificarlo per l'esecuzione ripetutamente e caricherà solo nuovi dati nella tabella Delta.
Nota
La tabella Delta vuota non è utilizzabile all'esterno COPY INTO.
INSERT INTO e MERGE INTO non sono supportati per scrivere dati in tabelle Delta senza schema. Dopo l'inserimento dei dati nella tabella con COPY INTO, la tabella diventa queryabile.
Vedere Creare tabelle di destinazione per COPY INTO.
Impostare lo schema e caricare i dati in una tabella Delta Lake
L'esempio seguente crea una tabella Delta e usa il COPY INTO comando SQL per caricare i dati di esempio dal set di dati Wanderbricks nella tabella. I file di origine sono file JSON archiviati in un volume del catalogo Unity. È possibile eseguire l'esempio Python, R, Scala o codice SQL da un notebook collegato a un cluster Azure Databricks. È anche possibile eseguire il codice SQL da una query associata a un'istanza di SQL Warehouse in Databricks SQL. Sostituire <catalog>, <schema>, e <volume> con un catalogo, uno schema e un volume in cui si dispone delle autorizzazioni CREATE TABLE.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Linguaggio di programmazione Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Per eseguire la pulizia, eseguire il codice seguente per eliminare la tabella di esempio.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Linguaggio di programmazione Scala
spark.sql("DROP TABLE " + table_name)
Pulire i file di metadati
È possibile eseguire VACUUM per pulire i file di metadati non referenziati creati da COPY INTO in Databricks Runtime 15.2 e versioni successive.
Risorse aggiuntive
- Carica i dati usando COPY INTO con volumi del Catalogo Unity o posizioni esterne
-
Modelli di caricamento dei dati comuni tramite
COPY INTO.
- Databricks Runtime 7.x e versioni successive:
COPY INTO