Condividi tramite


Caricare dati in modo incrementale da Istanza gestita di SQL di Azure a Archiviazione di Azure usando Change Data Capture (CDC)

APPLICABILE A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Data Factory in Microsoft Fabric è la nuova generazione di Azure Data Factory, con un'architettura più semplice, un'intelligenza artificiale predefinita e nuove funzionalità. Se non si ha familiarità con l'integrazione dei dati, iniziare con Fabric Data Factory. I carichi di lavoro di Azure Data Factory esistenti possono eseguire l'aggiornamento a Fabric per accedere a nuove funzionalità tra data science, analisi in tempo reale e creazione di report.

In questa esercitazione viene creata una data factory di Azure con una pipeline che carica dati differenziali basati su change data capture (CDC) informazioni nel database Istanza gestita di SQL di Azure di origine in un archivio BLOB Azure.

In questa esercitazione vengono completati i passaggi seguenti:

  • Preparare l'archivio dati di origine.
  • Creare una fabbrica di dati.
  • Creare servizi collegati.
  • Creare set di dati di origine e destinazione.
  • Creare, eseguire il debug ed eseguire la pipeline per verificare la presenza di dati modificati
  • Modificare i dati nella tabella di origine
  • Completare, eseguire e monitorare la pipeline di copia incrementale completa

Panoramica

La tecnologia Change Data Capture supportata dagli archivi dati, ad esempio Azure SQL istanze gestite (MI) e SQL Server possono essere usate per identificare i dati modificati. Questa esercitazione descrive come usare Azure Data Factory con la tecnologia SQL Change Data Capture per caricare in modo incrementale i dati differenziali da Istanza gestita di SQL di Azure in Archiviazione BLOB di Azure. Per informazioni più concrete sulla tecnologia SQL Change Data Capture, vedere Change Data Capture in SQL Server.

Flusso di lavoro completo

Ecco alcuni passaggi del flusso di lavoro end-to-end tipico per caricare dati in modo incrementale usando la tecnologia Change Data Capture.

Nota

Sia Azure SQL MI che SQL Server supportano la tecnologia Change Data Capture. Questa esercitazione usa Istanza gestita di SQL di Azure come archivio dati di origine. È anche possibile usare un SQL Server locale.

Soluzione di alto livello

In questa esercitazione verrà creata una pipeline che esegue le operazioni seguenti:

  1. Creare un'attività di ricerca per contare il numero di record modificati nella tabella CDC del database SQL e passarlo a un'attività condizionale If.
  2. Creare una Condizione if per verificare se sono presenti record modificati e, in tal caso, richiamare l'attività di copia.
  3. Creare un'attività di copia per trasferire i dati inseriti, aggiornati o eliminati dalla tabella CDC ad Archiviazione BLOB di Azure.

Se non si ha una sottoscrizione Azure, creare un account free prima di iniziare.

Prerequisiti

  • Istanza gestita di SQL di Azure. Usare il database come archivio dati di origine. Se non si dispone di un Istanza gestita di SQL di Azure, vedere l'articolo Creare un database SQL di Azure Istanza gestita per la procedura per crearne una.
  • Archiviazione di Azure account. Usare l'archivio BLOB come archivio dati sink. Se non si ha un account di archiviazione Azure, vedere l'articolo Creare un account di archiviazione per la procedura per crearne uno. Creare un contenitore denominato raw.

Creare una tabella di origine dati in database SQL di Azure

  1. Avvia SQL Server Management Studio e connetti al server di istanza gestita di Azure SQL.

  2. In Esplora server fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.

  3. Eseguire il comando SQL seguente sul database di istanze gestite di Azure SQL per creare una tabella denominata customers come archivio origine dati.

    create table customers 
    (
    customer_id int, 
    first_name varchar(50), 
    last_name varchar(50), 
    email varchar(100), 
    city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") 
     );
    
  4. Abilitare il meccanismo Change Data Capture nel database e nella tabella di origine (customers) eseguendo la query SQL seguente:

    Nota

    • Sostituire <il nome del tuo schema di origine> con lo schema dell'istanza gestita di Azure SQL che contiene la tabella clienti.
    • Change Data Capture non svolge alcuna operazione come parte delle transazioni che modificano la tabella monitorata. Al contrario, le operazioni di inserimento, aggiornamento ed eliminazione vengono scritte nel log delle transazioni. Se non vengono eliminati in modo periodico e sistematico, i dati inseriti nelle tabelle di modifica cresceranno in modo incontrollabile. Per altre informazioni, vedere Abilitare Change Data Capture per un database
    EXEC sys.sp_cdc_enable_db 
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'customers', 
    @role_name = NULL,
    @supports_net_changes = 1
    
  5. Per inserire i dati nella tabella customers. eseguire questo comando:

     insert into customers 
         (customer_id, first_name, last_name, email, city) 
     values 
         (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'),
         (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'),
        (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
    

    Nota

    Non vengono acquisite modifiche cronologiche apportate alla tabella prima dell'abilitazione di Change Data Capture.

Creare una data factory

Seguire la procedura descritta nell'articolo Quickstart: Creare una data factory usando il portale di Azure per creare una data factory se non ne hai già una con cui lavorare.

Creare servizi collegati

Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa sezione vengono creati servizi collegati all'account Archiviazione di Azure e Azure SQL MI.

Creare servizio collegato ad Archiviazione di Azure.

In questo passaggio si collega l'account Archiviazione di Azure alla data factory.

  1. Fare clic su Connessioni e quindi su + Nuovo.

    Pulsante per una nuova connessione

  2. Nella finestra Nuovo servizio collegato selezionare Archiviazione BLOB di Azure e fare clic su Continue.

    Select Archiviazione BLOB di Azure

  3. Nella finestra New Linked Service (Nuovo servizio collegato) seguire questa procedura:

    1. Immettere AzureStorageLinkedService per Nome.
    2. Selezionare l'account Archiviazione di Azure per Nome account di archiviazione.
    3. Fare clic su Salva.

    Impostazioni account di Archiviazione di Azure

Creare Azure SQL servizio collegato database MI.

In questo passaggio, si collega il database Azure SQL di istanza gestita alla data factory.

Nota

Per coloro che usano SQL MI, vedere qui per informazioni sull'accesso tramite endpoint pubblico o privato. Se si usa un endpoint privato, è necessario eseguire questa pipeline usando un runtime di integrazione self-hosted. Lo stesso vale per coloro che eseguono SQL Server locale, in scenari di macchina virtuale o di rete virtuale.

  1. Fare clic su Connessioni e quindi su + Nuovo.

  2. Nella finestra Nuovo servizio collegato selezionare database SQL di Azure Istanza gestita e fare clic su Continue.

  3. Nella finestra New Linked Service (Nuovo servizio collegato) seguire questa procedura:

    1. Immettere AzureSqlMI1 nel campo Nome.
    2. Selezionare il server SQL in uso nel campo Nome server.
    3. Selezionare il database SQL in uso nel campo Nome database.
    4. Immettere il nome dell'utente nel campo Nome utente.
    5. Immettere la password dell'utente nel campo Password.
    6. Fare clic su Test connessione per testare la connessione.
    7. Fare clic su Salva per salvare il servizio collegato.

    Impostazioni del servizio collegato di Azure SQL MI Database

Creare i set di dati

In questo passaggio verranno creati i set di dati per rappresentare l'origine dati e la destinazione dati.

Creare un set di dati per rappresentare i dati di origine

In questo passaggio viene creato un set di dati per rappresentare i dati di origine.

  1. Nella visualizzazione albero fare clic su + (segno più) e quindi su Set di dati.

    Menu per nuovo set di dati

  2. Selezionare database SQL di Azure Istanza gestita e fare clic su Continue.

    tipo di origine del set di dati - database SQL di Azure

  3. Nella scheda Imposta proprietà impostare il nome del set di dati e le informazioni di connessione:

    1. Selezionare AzureSqlMI1 per Servizio collegato.
    2. Selezionare [dbo].[dbo_customers_CT] per Nome tabella. Nota: questa tabella è stata creata automaticamente quando CDC è stato abilitato sulla tabella dei clienti. I dati modificati non vengono mai sottoposti a query direttamente da questa tabella, ma vengono invece estratti tramite le funzioni di Change Data Capture.

    Connessione di origine

Creare un dataset per rappresentare i dati copiati nel deposito dati di destinazione.

In questo passaggio viene creato un set di dati per rappresentare i dati copiati dall'archivio dati di origine. Il contenitore data lake è stato creato nel Archiviazione BLOB di Azure come parte dei prerequisiti. Creare il contenitore se non esiste oppure impostare il nome di un contenitore esistente. In questa esercitazione il nome del file di output viene generato in modo dinamico usando l'ora di attivazione, che verrà configurata in seguito.

  1. Nella visualizzazione albero fare clic su + (segno più) e quindi su Set di dati.

    Menu per nuovo set di dati

  2. Selezionare Archiviazione BLOB di Azure e fare clic su Continue.

    tipo di set di dati Sink - Archiviazione BLOB di Azure

  3. Selezionare DelimitedText e fare clic su Continua.

    Formato del set di dati di destinazione: DelimitedText

  4. Nella scheda Imposta proprietà impostare il nome del set di dati e le informazioni di connessione:

    1. Selezionare AzureStorageLinkedService per Servizio collegato.
    2. Immettere raw per la parte container di filePath.
    3. Abilitare Prima riga come intestazione
    4. Fare clic su OK.

    Set di dati sink: connessione

Creare una pipeline per copiare i dati modificati

In questo passaggio verrà creata una pipeline che controlla prima di tutto il numero di record modificati presenti nella tabella delle modifiche tramite un'attività di ricerca. Un'attività condizione IF controlla se il numero di record modificati è maggiore di zero ed esegue un'attività copy per copiare i dati inseriti/aggiornati/eliminati da database SQL di Azure a Archiviazione BLOB di Azure. Infine, viene configurato un trigger di finestra a scorrimento e i tempi di inizio e di fine verranno passati alle attività come parametri della finestra di inizio e di fine.

  1. Nell'interfaccia utente di Data Factory passare alla scheda Modifica . Fare clic su + (più) nel riquadro sinistro e fare clic su Pipeline.

    Nuovo menu pipeline

  2. Verrà visualizzata una nuova scheda per la configurazione della pipeline. Anche la pipeline è visibile nella vista ad albero. Nella finestra Proprietà modificare il nome della pipeline in IncrementalCopyPipeline.

    Nome della pipeline

  3. Espandi Generale nel toolbox Attività e trascina l'attività Lookup nell'area di progettazione della pipeline. Impostare il nome dell'attività su GetChangeCount. Questa attività ottiene il numero di record nella tabella delle modifiche per un intervallo di tempo specificato.

    Attività di ricerca: nome

  4. Passare a Impostazioni nella finestra Proprietà:

    1. Specificare il nome del set di dati dell'istanza gestita di SQL per il campo Set di dati di origine.

    2. Selezionare l'opzione Query e immettere quanto segue nella casella query:

    DECLARE  @from_lsn binary(10), @to_lsn binary(10);  
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');  
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',  GETDATE());
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    
    1. Abilitare Solo prima riga

    Attività di ricerca: impostazioni

  5. Fare clic sul pulsante Anteprima dati per assicurarsi che l'attività di ricerca restituisca un output valido

    Attività di ricerca: anteprima

  6. Nella casella degli strumenti Attività espandere Iterazione e condizionali e trascinare l'attività Condizione if nell'area di progettazione della pipeline. Impostare il nome dell'attività su HasChangedRows.

    Attività con condizione If - nome

  7. Passare ad Attività nella finestra Proprietà:

    1. Immettere l'Espressione seguente
    @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    
    1. Fare clic sull'icona a forma di matita per modificare la condizione True.

    Attività Condizione If: impostazioni

    1. Espandi Generale nella casella degli strumenti Attività e trascina un'attività Attesa nell'area di progettazione della pipeline. Si tratta di un'attività temporanea per eseguire il debug della condizione if e verrà modificata più avanti nell'esercitazione.

    Se la condizione è vera - attendi

    1. Fare clic sulla barra di navigazione IncrementalCopyPipeline per tornare alla pipeline principale.
  8. Eseguire la pipeline in modalità Debug per verificare che la pipeline venga eseguita correttamente.

    Pipeline: debug

  9. Tornare quindi al passaggio relativo alla condizione True ed eliminare l'attività Attendi. Nella casella degli strumenti Attività, espandere Sposta e trasforma e trascinare un'operazione di Copia nell'area di progettazione della pipeline. Impostare il nome dell'attività su IncrementalCopyActivity.

    Attività di copia: nome

  10. Passare alla scheda Origine nella finestra Proprietà e seguire questa procedura:

  11. Specificare il nome del set di dati dell'istanza gestita di SQL per il campo Set di dati di origine.

  12. Selezionare Query per Usa query.

  13. In Query immettere quanto segue.

    DECLARE @from_lsn binary(10), @to_lsn binary(10); 
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); 
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    

Attività di copia: impostazioni origine

  1. Fare clic su Anteprima per verificare che la query restituisca correttamente le righe modificate.

    Screenshot che mostra l'anteprima per verificare la query.

  2. Passare alla scheda Sink e specificare il set di dati Archiviazione di Azure per il campo Sink Dataset.

    Screenshot che mostra la scheda Sink.

  3. Fare clic per tornare all'area di disegno della pipeline principale e connettere l'attività Ricerca all'attività Condizione if una alla volta. Trascinare il pulsante verde associato all'attività Ricerca sull'attività Condizione If.

    Connettere le attività di ricerca e di copia

  4. Fare clic su Convalida sulla barra degli strumenti. Verificare che non siano presenti errori di convalida. Chiudere la finestra del report di convalida della pipeline facendo clic su >>.

    Pulsante Convalida

  5. Fare clic su Debug per testare la pipeline e verificare che nella posizione di archiviazione sia stato generato un file.

    Debug pipeline incrementale: 2

  6. Per pubblicare le entità (servizi collegati, set di dati e pipeline) nel servizio Data Factory, fare clic sul pulsante Pubblica tutti. Attendere fino alla visualizzazione del messaggio Pubblicazione riuscita.

    Pulsante di Pubblicazione

Configurare il trigger della finestra a scorrimento e i parametri della finestra di acquisizione dati (CDC)

In questo passaggio viene creato un trigger di finestra scorrevole per eseguire il processo in base a una pianificazione frequente. Verranno usate le variabili di sistema WindowStart e WindowEnd del trigger di finestra a cascata che saranno passate come parametri alla pipeline da usare nella query di Change Data Capture.

  1. Passare alla scheda Parametri della pipeline IncrementalCopyPipeline e usare il pulsante + Nuovo per aggiungere alla pipeline due parametri (triggerStartTime e triggerEndTime), che rappresenteranno l'ora di inizio e di fine della finestra a cascata. Ai fini del debug, aggiungere i valori predefiniti nel formato AAAA-MM-GG HH24:MI:SS.FFF e assicurarsi che triggerStartTime non sia antecedente al momento in cui il CDC è stato abilitato sulla tabella, altrimenti verrà generato un errore.

    Menu Trigger Now (Attiva adesso)

  2. Fare clic sulla scheda Impostazioni dell'attività Ricerca e configurare la query per l'utilizzo dei parametri relativi all'ora iniziale e all'ora finale. Copiare quanto segue nella query:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  3. Passare all'attività Copy nel caso True dell'attività If Condition e fare clic sulla scheda Origine. Inserire quanto segue nella query:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  4. Fare clic sulla scheda Sink dell'attività Copia e fare clic su Apri per modificare le proprietà del set di dati. Fare clic sulla scheda Parametri e aggiungere un nuovo parametro denominato triggerStart

    Screenshot che mostra l'aggiunta di un nuovo parametro alla scheda Parametri.

  5. Configurare quindi le proprietà del set di dati per archiviare i dati in una sottodirectory customers/incremental con le partizioni basate su data.

    1. Fare clic sulla scheda Connessione delle proprietà del set di dati e aggiungere contenuto dinamico per le sezioni Directory e File.

    2. Immettere l'espressione seguente nella sezione Directory facendo clic sul collegamento al contenuto dinamico nella casella di testo:

      @concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
      
    3. Immettere l'espressione seguente nella sezione File. In questo modo i nomi dei file verranno creati in base alla data e all'ora di inizio del trigger, a cui verrà aggiunto un suffisso con l'estensione csv:

      @concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
      

      Configurazione del dataset di uscita 3

    4. Tornare alle impostazioni del Sink nell'attività Copia facendo clic sulla scheda IncrementalCopyPipeline.

    5. Espandere le proprietà del set di dati e immettere il contenuto dinamico nel valore del parametro triggerStart con l'espressione seguente:

      @pipeline().parameters.triggerStartTime
      

    Configurazione Sink Dataset-4

  6. Fare clic su Debug per testare la pipeline e verificare che la struttura di cartelle e il file di output vengano generati come previsto. Scaricare e aprire il file per verificarne il contenuto.

    Copia Incrementale Debug-3

  7. Verificare che i parametri vengano inseriti nella query esaminando i parametri di input dell'esecuzione della pipeline.

    Copia Incrementale Debug-4

  8. Per pubblicare le entità (servizi collegati, set di dati e pipeline) nel servizio Data Factory, fare clic sul pulsante Pubblica tutti. Attendere fino alla visualizzazione del messaggio Pubblicazione riuscita.

  9. Infine, configurare un trigger di finestra a cascata per eseguire la pipeline a intervalli regolari e impostare i parametri dell'ora di inizio e di fine.

    1. Fare clic sul pulsante Aggiungi trigger e selezionare Nuova/Modifica

    Aggiungere un nuovo trigger

    1. Immettere un nome di trigger e specificare un'ora di inizio, che è uguale all'ora di fine della finestra di debug precedente.

    Trigger di finestra a cascata

    1. Nella schermata successiva specificare rispettivamente i valori seguenti per i parametri di inizio e di fine.

      @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
      @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
      

      Trigger di finestra a cascata: 2

Nota

Il trigger verrà eseguito solo dopo la pubblicazione. Inoltre, il comportamento previsto della finestra a scorrimento prevede l'esecuzione di tutti gli intervalli cronologici dalla data di inizio fino ad ora. Ulteriori dettagli sui trigger di finestra scorrevole sono disponibili qui.

  1. Usando SQL Server Management Studio apportare alcune modifiche aggiuntive alla tabella del cliente eseguendo il codice SQL seguente:

    insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading');
    insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth');
    insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth');
    update customers set first_name='Elon' where customer_id=6;
    delete from customers where customer_id=5;
    
  2. Fare clic sul pulsante Pubblica tutti. Attendere fino alla visualizzazione del messaggio Pubblicazione riuscita.

  3. Dopo alcuni minuti la pipeline verrà attivata e verrà caricato un nuovo file in Archiviazione di Azure

Monitorare la pipeline di copia incrementale

  1. Fare clic sulla scheda Monitoraggio a sinistra. L'esecuzione della pipeline verrà visualizzata nell'elenco con il relativo stato. Per aggiornare l'elenco, fare clic su Aggiorna. Passare il puntatore del mouse accanto al nome della pipeline per accedere all'azione Riesegui e al report sull'utilizzo.

    Esecuzioni della pipeline

  2. Per visualizzare le esecuzioni attività associate a questa esecuzione della pipeline, fare clic sul nome della pipeline. Se sono stati rilevati dati modificati, saranno presenti tre attività, inclusa l'attività di copia, in caso contrario saranno presenti solo due voci nell'elenco. Per tornare alla visualizzazione delle esecuzioni di pipeline, fare clic sul collegamento Tutte le pipeline in alto.

    Esecuzioni di attività

Esaminare i risultati

Il secondo file viene visualizzato nella cartella customers/incremental/YYYY/MM/DD del contenitore raw.

File di output derivante da copia incrementale

Passare all'esercitazione successiva per copiare file nuovi o modificati solo in base a "LastModifiedDate":