Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Databricks consiglia di usare il comando COPY INTO per il caricamento incrementale e bulk dei dati per le origini dati che contengono migliaia di file.
In questa esercitazione si usa il comando COPY INTO per caricare dati JSON da un volume del catalogo Unity in una tabella Delta nell'area di lavoro Azure Databricks. Usare il set di dati di esempio Wanderbricks come origine dati. Per casi d'uso di inserimento più avanzati, vedere Che cos'è il caricatore automatico?.
Requisiti
- Accesso a una risorsa di calcolo. Vedere Calcolo.
- Un'area di lavoro abilitata per il catalogo unity con autorizzazioni per creare schemi e volumi in un catalogo. Vedete Connettersi all'archiviazione di oggetti cloud usando Unity Catalog.
Passaggio 1: Configurare l'ambiente
Il codice in questa esercitazione usa un volume di Catalogo Unity per archiviare i file di origine JSON. Sostituire <catalog> con un catalogo in cui si dispone delle autorizzazioni CREATE SCHEMA e CREATE VOLUME. Se non è possibile eseguire il codice, contattare l'amministratore dell'area di lavoro.
Creare un notebook e collegarlo a una risorsa di calcolo. Eseguire quindi il codice seguente per configurare uno schema e un volume per questa esercitazione.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Passaggio 2: Scrivere dati di esempio nel volume come JSON
Il COPY INTO comando carica i dati da origini basate su file. Leggere dalla tabella di esempio Wanderbricksbookings e scrivere un batch di record come file JSON nel volume, simulando i dati provenienti da un sistema esterno.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
La scrittura di file in un volume richiede Python. In un flusso di lavoro reale, questi dati arrivano da un sistema esterno.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Passaggio 3: Utilizzare COPY INTO per caricare i dati JSON in modo idempotente
Creare una tabella Delta di destinazione prima di usare COPY INTO. Non è necessario specificare elementi diversi da un nome di tabella nell'istruzione CREATE TABLE . Poiché questa azione è idempotente, Databricks carica i dati una sola volta, anche se si esegue il codice più volte.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Passaggio 4: Visualizzare in anteprima il contenuto della tabella
Verificare che la tabella contenga 20 righe dal primo batch di dati delle prenotazioni wanderbricks e che lo schema sia stato dedotto correttamente dai file di origine JSON.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Passaggio 5: Caricare più dati e visualizzare in anteprima i risultati
È possibile simulare dati aggiuntivi in arrivo da un sistema esterno scrivendo un altro batch di record ed eseguendo COPY INTO di nuovo. Eseguire il codice seguente per scrivere un secondo batch di dati.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
La scrittura di file in un volume richiede Python. In un flusso di lavoro reale, questi dati arrivano da un sistema esterno.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Eseguire quindi di nuovo il COPY INTO comando dal passaggio 3 e visualizzare in anteprima la tabella per confermare i nuovi record. Vengono caricati solo i nuovi file.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Passaggio 6: Fase di completamento del tutorial
Al termine di questa esercitazione, è possibile pulire le risorse associate se non si vogliono più mantenerle. Eliminare lo schema, le tabelle e il volume e rimuovere tutti i dati.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;