Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
La scalabilità automatica di Lakebase è la versione più recente di Lakebase, con calcolo autoscalabile, scalabilità fino a zero, ramificazione e ripristino immediato. Per le aree supportate, vedere Disponibilità dell'area. Se sei un utente provisioning di Lakebase, vedere Lakebase provisioning.
Le tabelle sincronizzate consentono di gestire i dati lakehouse tramite Lakebase Postgres. Le tabelle del catalogo Unity vengono sincronizzate in Postgres in modo che le applicazioni possano eseguire query sui dati lakehouse direttamente con bassa latenza. Questo processo è comunemente noto come ETL inverso. Il lakehouse è ottimizzato per l'analisi e l'arricchimento, mentre Lakebase è progettato per carichi di lavoro operativi che richiedono query in stile ricerca veloce e coerenza transazionale.
Che cosa sono le tabelle sincronizzate?
Le tabelle sincronizzate consentono di gestire dati di livello di analisi da Unity Catalog tramite Lakebase Postgres, rendendoli disponibili per le applicazioni che necessitano di query a bassa latenza (sub-10 ms) e transazioni ACID complete. Consentono di colmare il divario tra l'archiviazione analitica e i sistemi operativi mantenendo i dati pronti per la gestione in applicazioni in tempo reale.
Fonti supportate
Le tabelle sincronizzate supportano i tipi di origine del catalogo Unity seguenti:
- Tabelle Delta gestite ed esterne
- Tabelle Di Iceberg gestite ed esterne
- Viste e viste materializzate
Come funziona
Le tabelle sincronizzate di Databricks creano una copia gestita dei dati di Unity Catalog in Lakebase. Quando si crea una tabella sincronizzata, si ottiene:
- Tabella sincronizzata in Unity Catalog che fa riferimento alla pipeline di sincronizzazione
- Una tabella Postgres in Lakebase (di sola lettura, su cui è possibile eseguire query dalle applicazioni)
Ad esempio, è possibile sincronizzare tabelle gold, funzionalità ingegneriate o output ml da analytics.gold.user_profiles in una nuova tabella analytics.gold.user_profiles_syncedsincronizzata. In Postgres il nome dello schema del catalogo Unity diventa il nome dello schema Postgres, quindi viene visualizzato come "gold"."user_profiles_synced":
SELECT * FROM "gold"."user_profiles_synced" WHERE "user_id" = 12345;
Le applicazioni si connettono con i driver Postgres standard ed eseguono query sui dati sincronizzati insieme al proprio stato operativo.
Avviso
Anche se è possibile modificare una tabella sincronizzata direttamente in Postgres, Azure Databricks consiglia rigorosamente di eseguire solo query di lettura per proteggere l'integrità dei dati con l'origine. Per le operazioni supportate sulle tabelle sincronizzate, vedere Operazioni supportate.
Le pipeline di sincronizzazione utilizzano le Lakeflow Spark Declarative Pipelines gestite per aggiornare continuamente sia la tabella sincronizzata di Unity Catalog che la tabella Postgres con le modifiche dalla tabella di origine. Ogni sincronizzazione può usare fino a 16 connessioni al database Lakebase.
Lakebase Postgres supporta fino a 1.000 connessioni simultanee con garanzie transazionali, in modo che le applicazioni possano leggere dati arricchiti gestendo al tempo stesso inserimenti, aggiornamenti ed eliminazioni nello stesso database.
Modalità di sincronizzazione
Scegliere la modalità di sincronizzazione corretta in base alle esigenze dell'applicazione:
| Modalità | Descrizione | Quando utilizzare | Prestazioni |
|---|---|---|---|
| Snapshot | Copia monouso di tutti i dati | Le modifiche >cambiano il 10% delle righe per ciclo o la sorgente non supporta CDF (viste, tabelle Iceberg) | 10 volte più efficiente se si modificano >10% di dati di origine |
| Attivato | Aggiornamenti pianificati eseguiti su richiesta o a intervalli | Le righe di origine cambiano in base a una frequenza nota. Gli inserimenti, gli aggiornamenti e le eliminazioni vengono propagati ogni volta che si aggiorna. | Buon rapporto costo/ritardo. Costoso se si eseguono <intervalli di 5 minuti |
| Continuo | Streaming in tempo reale con secondi di latenza | Le modifiche devono essere visualizzate in Lakebase quasi in tempo reale | Ritardo più basso, costo più alto. Intervalli minimi di 15 secondi |
Le modalità attivate e continue richiedono l'abilitazione del feed di dati delle modifiche (CDF) nella tabella di origine. Se CDF non è abilitato, verrà visualizzato un avviso nell'interfaccia utente con il comando esatto ALTER TABLE da eseguire. Per altre informazioni sul feed di dati delle modifiche, vedere Usare il feed di dati delle modifiche Delta Lake in Databricks.
Casi d'uso di esempio
È possibile usare tabelle sincronizzate per i casi d'uso di gestione dei dati, ad esempio:
- Motori di personalizzazione che servono profili utente aggiornati alle app di Databricks
- Applicazioni che gestiscono stime del modello o valori di funzionalità calcolati nella lakehouse
- Dashboard rivolti ai clienti che servono gli indicatori KPI in tempo reale
- Servizi di rilevamento delle frodi che forniscono punteggi di rischio per un intervento immediato
- Strumenti di supporto che forniscono i record dei clienti arricchiti dai dati del lakehouse
Creare una tabella sincronizzata
Prerequisiti
È necessario:
- Un'area di lavoro di Databricks con Lakebase abilitata.
- Un progetto Lakebase (vedere Creare un progetto).
- Tabella del catalogo Unity da sincronizzare.
- Autorizzazioni per creare tabelle sincronizzate. È necessario USE_SCHEMA e CREATE_TABLE in qualsiasi schema usato.
Per le modalità attivate o continue , è necessario abilitare il feed di dati di modifica nella tabella di origine:
ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Per la pianificazione della capacità e la compatibilità dei tipi di dati, vedere Tipi di dati e compatibilità e pianificazione della capacità.
INTERFACCIA UTENTE
Passare a Catalogo nella barra laterale dell'area di lavoro e selezionare la tabella del catalogo Unity da sincronizzare.
Fare clic su Crea>tabella sincronizzata nella vista dettagli della tabella.
Nella finestra di dialogo Crea tabella sincronizzata :
Gli elenchi di catalogo e schema includono solo schemi del catalogo Unity in cui l'utente corrente ha USE_SCHEMA e CREATE_TABLE privilegi. Se non viene visualizzato uno schema previsto, confermare le autorizzazioni con l'amministratore del catalogo.
- Nome tabella: immettere un nome per la tabella sincronizzata(viene creato nello stesso catalogo e schema della tabella di origine). In questo modo viene creata sia una tabella sincronizzata del catalogo Unity che una tabella Postgres su cui è possibile eseguire query.
- Tipo di database: scegliere Lakebase Serverless (scalabilità automatica).
- Modalità di sincronizzazione: scegliere Snapshot, Attivato o Continuo in base alle proprie esigenze (vedere le modalità di sincronizzazione precedenti).
- Configurare le selezioni di progetto, ramo e database.
- Verificare che la chiave primaria sia corretta (in genere rilevata automaticamente).
Se hai scelto la modalità Triggered o Continua e non hai ancora abilitato il Feed dati delle modifiche, verrà visualizzato un avviso con il comando esatto da eseguire. Per domande sulla compatibilità dei tipi di dati, vedere Tipi di dati e compatibilità.
Fare clic su Crea per creare la tabella sincronizzata.
Monitorare la tabella sincronizzata in Catalog. La scheda Panoramica mostra lo stato di sincronizzazione, la configurazione, lo stato della pipeline e il timestamp dell'ultima sincronizzazione. Usare Sincronizzazione ora per l'aggiornamento manuale.
PYTHON SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.postgres import (
SyncedTable,
SyncedTableSyncedTableSpec,
SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy,
)
w = WorkspaceClient()
synced_table = w.postgres.create_synced_table(
synced_table=SyncedTable(spec=SyncedTableSyncedTableSpec(
source_table_full_name="main.sales.orders",
project="projects/my-project",
branch="projects/my-project/branches/production",
primary_key_columns=["order_id"],
scheduling_policy=SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.TRIGGERED,
postgres_database="mydb",
create_database_objects_if_missing=True,
)),
synced_table_id="my-catalog.sales.orders",
).wait()
print(f"Synced table created: {synced_table.name}")
synced_table_id usa il formato catalog.schema.table e diventa sia il nome della tabella sincronizzata del catalogo Unity che il nome della tabella Postgres.
Java SDK
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.*;
import java.util.List;
WorkspaceClient w = new WorkspaceClient();
SyncedTable syncedTable = w.postgres().createSyncedTable(
new CreateSyncedTableRequest()
.setSyncedTableId("my-catalog.sales.orders")
.setSyncedTable(new SyncedTable()
.setSpec(new SyncedTableSyncedTableSpec()
.setSourceTableFullName("main.sales.orders")
.setProject("projects/my-project")
.setBranch("projects/my-project/branches/production")
.setPrimaryKeyColumns(List.of("order_id"))
.setSchedulingPolicy(SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.TRIGGERED)
.setPostgresDatabase("mydb")
.setCreateDatabaseObjectsIfMissing(true))))
.waitForCompletion();
System.out.println("Synced table created: " + syncedTable.getName());
curva
curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables?synced_table_id=my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"source_table_full_name": "main.sales.orders",
"project": "projects/my-project",
"branch": "projects/my-project/branches/production",
"primary_key_columns": ["order_id"],
"scheduling_policy": "TRIGGERED",
"postgres_database": "mydb",
"create_database_objects_if_missing": true
}
}'
Viene restituita un'operazione a esecuzione prolungata. Eseguire il polling del campo restituito name fino a done: true. Vedere Operazioni a esecuzione prolungata. Per la configurazione dell'autenticazione, vedere Autenticazione.
Pianificare o attivare le sincronizzazioni successive
Alla creazione, lo snapshot iniziale viene eseguito automaticamente. Per le modalità snapshot e attivate , le sincronizzazioni successive devono essere attivate in modo esplicito. La modalità continua si gestisce automaticamente.
Attività di sincronizzazione della tabella del database nella pipeline
L'attività della pipeline di sincronizzazione delle tabelle del database nei Processi Lakeflow esegue la pipeline di una tabella sincronizzata come passaggio nel flusso di lavoro. Configurare l'attività con un trigger di aggiornamento tabella o una pianificazione.
Trigger per gli aggiornamenti delle tabelle di origine
Genera il compito quando viene aggiornata la tabella del Catalogo Unity di origine. Con la modalità attivata , vengono applicate in modo incrementale solo le nuove modifiche, offrendo aggiornamenti quasi in tempo reale senza il costo sempre attivo della modalità continua.
- Nella barra laterale fare clic su Flussi di lavoro.
- Fare clic su Crea processo o aprire un processo esistente.
- Nella scheda Attività fare clic su + Aggiungi un altro tipo di attività.
- In Inserimento e trasformazione selezionare Pipeline di sincronizzazione tabelle di database.
- Nel campo Pipeline selezionare la pipeline associata alla tabella sincronizzata.
- In Pianificazioni e trigger fare clic su Aggiungi trigger.
- Selezionare Aggiornamento tabella come tipo di trigger.
- In Tabelle selezionare la tabella del catalogo Unity di origine da monitorare.
- Fare clic su Salva.
Generare attività in base a una pianificazione
Esegue la sincronizzazione a cadenza fissa. Ideale per la modalità snapshot , in cui un aggiornamento completo notturno o settimanale è in genere il modello più efficiente.
- Seguire i passaggi da 1 a 5 precedenti per aggiungere un'attività della pipeline di sincronizzazione tabelle di database a un processo.
- In Pianificazioni e trigger fare clic su Aggiungi trigger.
- Scegli Pianificato come tipo di trigger.
- Impostare la pianificazione cron e il fuso orario, quindi fare clic su Salva.
Attivare un'esecuzione di sincronizzazione a livello di codice
Attivare un'esecuzione di sincronizzazione a livello di codice, ad esempio alla fine di un notebook o di una pipeline upstream:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Get the pipeline ID from the synced table
table = w.postgres.get_synced_table("synced_tables/my-catalog.sales.orders")
pipeline_id = table.status.pipeline_id
# Trigger a sync run
w.pipelines.start_update(pipeline_id=pipeline_id)
Controllare lo stato di sincronizzazione
Per controllare lo stato corrente e l'ora dell'ultima sincronizzazione di una tabella sincronizzata:
INTERFACCIA UTENTE
In Catalogo passare alla tabella sincronizzata e selezionare la scheda Panoramica . Mostra lo stato di sincronizzazione corrente, lo stato della pipeline e il timestamp dell'ultima sincronizzazione.
PYTHON SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.postgres.get_synced_table("synced_tables/my-catalog.sales.orders")
print(f"State: {table.status.detailed_state}")
print(f"Last sync: {table.status.last_sync_time}")
print(f"Message: {table.status.message}")
Java SDK
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.SyncedTable;
WorkspaceClient w = new WorkspaceClient();
SyncedTable table = w.postgres().getSyncedTable("synced_tables/my-catalog.sales.orders");
System.out.println("State: " + table.getStatus().getDetailedState());
System.out.println("Last sync: " + table.getStatus().getLastSyncTime());
System.out.println("Message: " + table.getStatus().getMessage());
curva
curl "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}"
Tipi di dati e compatibilità
I tipi di dati di Unity Catalog vengono mappati ai tipi Postgres durante la creazione di tabelle sincronizzate. I tipi complessi (ARRAY, MAP, STRUCT) vengono archiviati come JSONB in Postgres.
| Tipo di colonna di origine | Tipo di colonna Postgres |
|---|---|
| BIGINT | BIGINT |
| BINARY | BYTEA |
| BOOLEAN | BOOLEAN |
| DATTERO | DATTERO |
| DECIMAL(p,s) | NUMERICO |
| DOPPIO | PRECISIONE DOPPIA |
| FLOAT | REALE |
| INT | INTEGER |
| INTERVAL | INTERVAL |
| SMALLINT | SMALLINT |
| filo | TESTO |
| TIMESTAMP | TIMESTAMP CON FUSO ORARIO |
| TIMESTAMP_NTZ | TIMESTAMP SENZA FUSO ORARIO |
| TINYINT | SMALLINT |
| ARRAY<tipoElemento> | JSONB |
| MAP<keyType,valueType> | JSONB |
| STRUCT<NomeCampo:TipoCampo[, ...]> | JSONB |
Annotazioni
I tipi GEOGRAPHY, GEOMETRY, VARIANT e OBJECT non sono supportati.
Gestire caratteri non validi
Alcuni caratteri come byte Null (0x00) sono consentiti nelle colonne STRING, ARRAY, MAP o STRUCT del catalogo Unity, ma non supportate nelle colonne POSTGRES TEXT o JSONB. Ciò può causare errori di sincronizzazione con errori come:
ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
Soluzioni:
Purificare i campi stringa: rimuovere i caratteri non supportati prima della sincronizzazione. Per i byte Null nelle colonne STRING:
SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_tableConverti in BINARY: per le colonne STRING in cui è necessario conservare i byte non elaborati, convertire in tipo BINARY.
Pianificazione della capacità
Quando si pianifica l'implementazione delle tabelle sincronizzate, prendere in considerazione i requisiti delle risorse seguenti:
- Utilizzo connessione: ogni tabella sincronizzata usa fino a 16 connessioni al database Lakebase, che vengono conteggiate per il limite di connessione dell'istanza.
- Limiti di dimensioni: il limite totale delle dimensioni dei dati logici in tutte le tabelle sincronizzate è di 8 TB. Le singole tabelle non hanno limiti, ma Databricks consiglia di non superare 1 TB per le tabelle che richiedono aggiornamenti.
-
Requisiti di denominazione: i nomi di database, schema e tabella possono contenere solo caratteri alfanumerici e caratteri di sottolineatura (
[A-Za-z0-9_]+). - Evoluzione dello schema: sono supportate solo le modifiche dello schema additive (ad esempio l'aggiunta di colonne) per le modalità attivate e continue.
- Frequenza di aggiornamento: Lakebase Autoscaling supporta nella pipeline di sincronizzazione scritture continue e con trigger a circa 150 righe al secondo per unità di capacità (CU) e scritture snapshot fino a 2.000 righe al secondo per CU.
Operazioni consentite nelle tabelle sincronizzate in Postgres
Azure Databricks consiglia di eseguire solo le operazioni seguenti in Postgres per le tabelle sincronizzate per evitare sovrascrizioni accidentali o incoerenze dei dati:
- Query di sola lettura
- Creazione di indici
- Eliminazione della tabella (per liberare spazio dopo la rimozione della tabella sincronizzata dal catalogo unity)
Anche se è possibile modificare le tabelle sincronizzate in Postgres in altri modi, interferisce con la pipeline di sincronizzazione.
Eliminare una tabella sincronizzata
Per eliminare una tabella sincronizzata, è necessario completare entrambi i passaggi seguenti.
Passaggio 1: Rimuovere dal catalogo Unity
Ciò arresta gli aggiornamenti dei dati, ma lascia la tabella Postgres sul posto.
INTERFACCIA UTENTE
In Catalogo individuare la tabella sincronizzata, fare clic selezionare Elimina.
PYTHON SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.postgres.delete_synced_table("synced_tables/my-catalog.sales.orders").wait()
Java SDK
import com.databricks.sdk.WorkspaceClient;
WorkspaceClient w = new WorkspaceClient();
w.postgres().deleteSyncedTable("synced_tables/my-catalog.sales.orders").waitForCompletion();
curva
curl -X DELETE "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}"
Passaggio 2: Rimuovere da Postgres
Connettersi al database Lakebase ed eliminare la tabella per liberare spazio:
DROP TABLE your_database.your_schema.your_table;
È possibile usare l'editor SQL o gli strumenti esterni per connettersi a Postgres.
Ulteriori informazioni
| Attività | Descrizione |
|---|---|
| Creare un progetto | Configurare un progetto Lakebase |
| Connettersi al database | Informazioni sulle opzioni di connessione per Lakebase |
| Registrare il database nel catalogo unity | Rendere visibili i dati di Lakebase in Unity Catalog per una governance unificata e le query tra più origini. |
| Integrazione del catalogo Unity | Informazioni sulla governance e sulle autorizzazioni |
Altre opzioni
Per la sincronizzazione dei dati in sistemi non Databricks, vedere Soluzioni ETL inverse di Partner Connect , ad esempio Census o Hightouch.