Freigeben über


Inkrementelles Laden von Daten aus Azure SQL Managed Instance in Azure Storage mithilfe der Änderungsdatenerfassung (CDC)

Gilt für: Azure Data Factory Azure Synapse Analytics

Tipp

Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.

In diesem Tutorial wird eine Azure Data Factory-Instanz mit einer Pipeline erstellt, die Deltadaten basierend auf Informationen von Change Data Capture (CDC) aus der Azure SQL Managed Instance-Quelldatenbank in Azure Blob Storage lädt.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Vorbereiten des Quelldatenspeichers
  • Erstellen einer Data Factory.
  • Erstellen Sie verknüpfte Dienste.
  • Erstellen von Quell- und Senkendatasets.
  • Erstellen, Debuggen und Ausführen der Pipeline, um eine Überprüfung auf geänderte Daten durchzuführen
  • Ändern von Daten in der Quelltabelle
  • Abschließen, Ausführen und Überwachen der gesamten Pipeline für inkrementelles Kopieren

Übersicht

Die Änderungsdatenerfassungstechnologie, die von Datenspeichern wie Azure SQL Managed Instances (MI) und SQL Server unterstützt wird, können verwendet werden, um geänderte Daten zu identifizieren. In diesem Lernprogramm wird beschrieben, wie Sie Azure Data Factory mit sql Change Data Capture-Technologie verwenden, um Delta-Daten aus Azure SQL Managed Instance in Azure Blob Storage inkrementell zu laden. Konkretere Informationen zur SQL Change Data Capture-Technologie finden Sie unter Change data capture in SQL Server.

Kompletter Workflow

Hier finden Sie die typischen Schritte des End-to-End-Workflows für das inkrementelle Laden von Daten mithilfe der Change Data Capture-Technologie.

Hinweis

Sowohl Azure SQL MI als auch SQL Server unterstützen die Change Data Capture-Technologie. In diesem Lernprogramm wird Azure SQL Managed Instance als Quelldatenspeicher verwendet. Sie können auch einen lokalen SQL-Server verwenden.

Hochrangige Lösung

In diesem Tutorial wird eine Pipeline für folgende Vorgänge erstellt:

  1. Erstellen einer Lookup-Aktivität, um die Anzahl geänderter Datensätze in der CDC-Tabelle der SQL-Datenbank zu ermitteln und an eine Aktivität vom Typ „If-Bedingung“ zu übergeben
  2. Erstellen einer If-Bedingung, um zu überprüfen, ob geänderte Datensätze vorhanden sind, und die Kopieraktivität aufzurufen, wenn dies der Fall ist
  3. Erstellen Sie eine copy activity, um die eingefügten/aktualisierten/gelöschten Daten von der CDC-Tabelle in den Azure Blob Storage zu kopieren.

Wenn Sie nicht über ein Azure-Abonnement verfügen, erstellen Sie ein free Konto, bevor Sie beginnen.

Voraussetzungen

  • Azure SQL Managed Instance. Sie verwenden die Datenbank als den Quell-Datenspeicher. Wenn Sie keine Azure SQL Managed Instance haben, lesen Sie den Artikel Create an Azure SQL-Datenbank verwaltete Instanz, um Schritte zur Erstellung einer solchen Instanz zu erfahren.
  • Azure Storage-Konto. Sie verwenden den Blob Storage als den Senken-Datenspeicher. Wenn Sie nicht über ein Azure-Speicherkonto verfügen, lesen Sie den Artikel Create a storage account, um Schritte zum Erstellen eines solchen zu finden. Erstellen Sie einen Container mit dem Namen raw.

Erstellen einer Datenquellentabelle in Azure SQL-Datenbank

  1. Starten Sie SQL Server Management Studio, und stellen Sie eine Verbindung mit Ihrem Azure SQL verwalteten Instanzenserver her.

  2. Klicken Sie im Server-Explorer mit der rechten Maustaste auf Ihre Datenbank, und wählen Sie Neue Abfrage.

  3. Führen Sie den folgenden SQL-Befehl für Ihre Azure SQL Managed Instances-Datenbank aus, um eine Tabelle mit dem Namen customers als Datenquellenspeicher zu erstellen.

    create table customers 
    (
    customer_id int, 
    first_name varchar(50), 
    last_name varchar(50), 
    email varchar(100), 
    city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") 
     );
    
  4. Führen Sie die folgende SQL-Abfrage aus, um den Mechanismus Change Data Capture für Ihre Datenbank und die Quelltabelle (customers) zu aktivieren:

    Hinweis

    • Ersetzen Sie <den Namen Ihres Quellschemas> durch das Schema Ihrer Azure SQL MI, das die Kundentabelle enthält.
    • Change Data Capture ist nicht an den Transaktionen beteiligt, mit denen die überwachte Tabelle geändert wird. Stattdessen werden Einfüge-, Aktualisierungs- und Löschvorgänge in das Transaktionsprotokoll geschrieben. Damit die Datenmenge, die in Änderungstabellen abgelegt wird, nicht auf eine unüberschaubare Größe anwächst, müssen die Daten regelmäßig und systematisch gekürzt werden. Weitere Informationen finden Sie unter Aktivieren von Change Data Capture für eine Datenbank.
    EXEC sys.sp_cdc_enable_db 
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'customers', 
    @role_name = NULL,
    @supports_net_changes = 1
    
  5. Führen Sie den folgenden Befehl aus, um Daten in die Tabelle „customers“ einzufügen:

     insert into customers 
         (customer_id, first_name, last_name, email, city) 
     values 
         (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'),
         (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'),
        (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
    

    Hinweis

    Änderungen, die vor der Change Data Capture-Aktivierung an der Tabelle vorgenommen wurden, werden nicht erfasst.

Erstellen einer Data Factory

Wenn Sie noch nicht über eine Data Factory verfügen, mit der Sie arbeiten können, führen Sie die im Artikel Schnellstart: Erstellen einer Data Factory im Azure-Portal aufgeführten Schritte aus, um eine Data Factory zu erstellen.

Erstellen von verknüpften Diensten

Um Ihre Datenspeicher und Compute Services mit der Data Factory zu verknüpfen, können Sie verknüpfte Dienste in einer Data Factory erstellen. In diesem Abschnitt erstellen Sie verknüpfte Dienste mit Ihrem Azure Storage Konto und Azure SQL MI.

Erstellen Sie einen verknüpften Azure-Storage-Dienst.

In diesem Schritt verknüpfen Sie Ihr Azure Storage Konto mit der Datenfactory.

  1. Klicken Sie auf Verbindungen und dann auf + Neu.

    Schaltfläche für eine neue Verbindung

  2. Wählen Sie im Fenster Neuer verknüpfter DienstAzure Blob Storage aus, und klicken Sie auf Continue.

    Select Azure Blob Storage

  3. Führen Sie im Fenster New Linked Service (Neuer verknüpfter Dienst) die folgenden Schritte aus:

    1. Geben Sie AzureStorageLinkedService unter Name ein.
    2. Wählen Sie Ihr Azure-Speicherkonto für Storage-Kontoname aus.
    3. Klicken Sie auf Speichern.

    Azure Storage Kontoeinstellungen

Erstellen eines mit der Azure SQL Managed Instance-Datenbank verknüpften Diensts

In diesem Schritt verknüpfen Sie Ihre Azure SQL MI-Datenbank mit der Datenfactory.

Hinweis

Bei Verwendung von SQL Managed Instance finden Sie hier Informationen zum Zugriff über einen öffentlichen oder über einen privaten Endpunkt. Bei Verwendung eines privaten Endpunkts muss diese Pipeline unter Verwendung einer selbstgehosteten Integration Runtime ausgeführt werden. Das Gleiche gilt für diejenigen, die SQL Server lokal ausführen, in einer virtuellen Maschine oder in VNet-Szenarien.

  1. Klicken Sie auf Verbindungen und dann auf + Neu.

  2. Wählen Sie im Fenster Neuer verknüpfter DienstAzure SQL-Datenbank verwaltete Instanz aus, und klicken Sie auf Continue.

  3. Führen Sie im Fenster New Linked Service (Neuer verknüpfter Dienst) die folgenden Schritte aus:

    1. Geben Sie im Feld Name den Namen AzureSqlMI1 ein.
    2. Wählen Sie im Feld Servername Ihre SQL Server-Instanz aus.
    3. Wählen Sie im Feld Datenbankname Ihre SQL-Datenbank aus.
    4. Geben Sie im Feld Benutzername den Namen des Benutzers ein.
    5. Geben Sie im Feld Kennwort das Kennwort für den Benutzer ein.
    6. Klicken Sie auf Verbindung testen, um die Verbindung zu testen.
    7. Klicken Sie auf Speichern, um den verknüpften Dienst zu speichern.

    Azure SQL MI-Datenbank-Linked-Service-Einstellungen

Erstellen von Datasets

In diesem Schritt werden Datasets erstellt, um die Datenquelle und das Datenziel darzustellen.

Erstellen eines Datasets zum Darstellen von Quelldaten

In diesem Schritt erstellen Sie ein Dataset, das für die Quelldaten steht.

  1. Klicken Sie in der Strukturansicht auf + (Pluszeichen) und dann auf Dataset.

    Menü „Neues Dataset“

  2. Wählen Sie Azure SQL-Datenbank verwaltete Instanz aus, und klicken Sie auf Continue.

    Source-Datasettyp - Azure SQL-Datenbank

  3. Legen Sie auf der Registerkarte Eigenschaften festlegen den Datasetnamen und die Verbindungsinformationen fest:

    1. Wählen Sie unter Verknüpfter Dienst die Option AzureSqlMI1 aus.
    2. Wählen Sie [dbo].[dbo_customers_CT] für Tabellenname aus. Hinweis: Diese Tabelle wurde automatisch erstellt, als CDC für die Tabelle „customers“ aktiviert wurde. Geänderte Daten werden nie direkt aus dieser Tabelle abgefragt, sondern stattdessen mithilfe der CDC-Funktionen extrahiert.

    Quellverbindung

Erstellen Sie ein Dataset zum Darstellen von Daten, die in den Senkendatenspeicher kopiert werden.

In diesem Schritt erstellen Sie ein Dataset, das für die Daten steht, die aus dem Quelldatenspeicher kopiert werden. Sie haben den Data Lake-Container in Ihrer Azure Blob Storage als Teil der Voraussetzungen erstellt. Erstellen Sie den Container, wenn er noch nicht vorhanden ist (oder) geben Sie den Namen eines bereits vorhandenen ein. In diesem Tutorial wird der Name der Ausgabedatei dynamisch unter Verwendung der (später konfigurierten) Auslösungszeit generiert.

  1. Klicken Sie in der Strukturansicht auf + (Pluszeichen) und dann auf Dataset.

    Menü „Neues Dataset“

  2. Wählen Sie Azure Blob Storage aus, und klicken Sie auf Continue.

    Sink-Datensatztyp - Azure Blob Storage

  3. Wählen Sie DelimitedText aus, und klicken Sie auf Weiter.

    Zieldatensatzformat: DelimitedText

  4. Legen Sie auf der Registerkarte Eigenschaften festlegen den Datasetnamen und die Verbindungsinformationen fest:

    1. Wählen Sie unter Verknüpfter Dienst die Option AzureStorageLinkedService.
    2. Geben Sie die Zeichenfolge raw für den Container-Teil des Dateipfads ein.
    3. Aktivieren Sie das Kontrollkästchen First row as header (Erste Zeile als Header).
    4. Klicken Sie auf OK.

    Sink-Dataset – Verbindung

Erstellen einer Pipeline zum Kopieren der geänderten Daten

In diesem Schritt wird eine Pipeline erstellt, die zunächst mithilfe einer Lookup-Aktivität überprüft, wie viele geänderte Datensätze in der Änderungstabelle vorhanden sind. Eine WENN-Bedingungsaktivität überprüft, ob die Anzahl der geänderten Datensätze größer als Null ist, und führt eine Copy-Aktivität aus aus, um die eingefügten/aktualisierten/gelöschten Daten aus Azure SQL-Datenbank in Azure Blob Storage zu kopieren. Abschließend wird ein Trigger für ein rollierendes Fenster konfiguriert, und die Start- und Endzeiten werden als Parameter für den Start und das Ende des Fensters an die Aktivitäten übergeben.

  1. Wechseln Sie auf der Data Factory-Benutzeroberfläche zur Registerkarte Bearbeiten. Klicken Sie im Bereich auf der linken Seite auf + (Pluszeichen) und dann auf Pipeline.

    Menü für neue Pipeline

  2. Eine neue Registerkarte zum Konfigurieren der Pipeline wird angezeigt. Außerdem wird die Pipeline in der Strukturansicht angezeigt. Ändern Sie im Eigenschaftenfenster den Namen der Pipeline in IncrementalCopyPipeline.

    Pipelinename

  3. Erweitern Sie in der Toolbox Aktivitäten die Option Allgemein, und ziehen Sie die Lookup-Aktivität auf die Oberfläche des Pipeline-Designers. Legen Sie den Namen der Aktivität auf GetChangeCount fest. Durch diese Aktivität wird die Anzahl von Datensätzen in der Änderungstabelle für ein bestimmtes Zeitfenster abgerufen.

    Lookup-Aktivität – Name

  4. Wechseln Sie im Fenster Eigenschaften zu Einstellungen:

    1. Geben Sie im Feld Source Dataset (Quelldataset) den SQL Managed Instance Dataset-Namen an.

    2. Wählen Sie die Option „Abfrage“ aus, und geben Sie Folgendes in das Abfragefeld ein:

    DECLARE  @from_lsn binary(10), @to_lsn binary(10);  
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');  
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',  GETDATE());
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    
    1. Aktivieren Sie das Kontrollkästchen First row only (Nur erste Zeile).

    Lookup-Aktivität – Einstellungen

  5. Klicken Sie auf die Schaltfläche Datenvorschau, um sich zu vergewissern, dass durch die Lookup-Aktivität eine gültige Ausgabe abgerufen wird.

    Lookup-Aktivität: Vorschau

  6. Erweitern Sie in der Toolbox Aktivitäten& die Option Iteration Conditionals (Iteration und Bedingungen), und ziehen Sie die Aktivität If-Bedingung auf die Oberfläche des Pipeline-Designers. Legen Sie den Namen der Aktivität auf HasChangedRows fest.

    Aktivität „If-Bedingung“: Name

  7. Wechseln Sie im Fenster Eigenschaften zu Aktivitäten:

    1. Geben Sie in das Feld Ausdruck Folgendes ein:
    @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    
    1. Klicken Sie auf das Stiftsymbol, um die True-Bedingung zu bearbeiten.

    Aktivität „If-Bedingung“: Einstellungen

    1. Erweitern Sie in der Toolbox Aktivitäten die Option Allgemein, und ziehen Sie eine Aktivität vom Typ Warten auf die Oberfläche des Pipeline-Designers. Hierbei handelt es sich um eine temporäre Aktivität zum Debuggen der If-Bedingung; sie wird später in diesem Tutorial noch geändert.

    If-Bedingung „True“: Warten

    1. Klicken Sie auf der Breadcrumb-Leiste auf „IncrementalCopyPipeline“, um zur Hauptpipeline zurückzukehren.
  8. Führen Sie die Pipeline im Modus Debuggen aus, um sich zu vergewissern, dass die Ausführung erfolgreich ist.

    Pipeline: Debuggen

  9. Kehren Sie als Nächstes zum Schritt für die True-Bedingung zurück, und löschen Sie die Aktivität Warten. Erweitern Sie in der Toolbox Aktivitäten die Option Move & transform (Verschieben und transformieren), und ziehen Sie eine Aktivität vom Typ Copy auf die Oberfläche des Pipeline-Designers. Legen Sie den Namen der Aktivität auf IncrementalCopyActivity fest.

    Copy-Aktivität – Name

  10. Wechseln Sie im Eigenschaftenfenster zur Registerkarte Quelle, und führen Sie die folgenden Schritte aus:

  11. Geben Sie den Namen des SQL MI-Datasets für das Feld Source Dataset an.

  12. Wählen Sie unter Abfrage verwenden die Option Abfrage.

  13. Geben Sie unter Abfrage Folgendes ein:

    DECLARE @from_lsn binary(10), @to_lsn binary(10); 
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); 
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    

Copy-Aktivität – Quelleinstellungen

  1. Klicken Sie auf die Vorschauoption, um sich zu vergewissern, dass die geänderten Zeilen durch die Abfrage korrekt zurückgegeben werden.

    Screenshot der Vorschau zum Überprüfen der Abfrage

  2. Wechseln Sie zur Registerkarte Sink, und geben Sie das Azure Storage Dataset für das Feld Sink Dataset an.

    Screenshot zeigt die Registerkarte „Sink“.

  3. Kehren Sie zur Canvas der Hauptpipeline zurück, und verbinden Sie nacheinander die Aktivität Lookup mit der Aktivität If-Bedingung. Ziehen Sie die grüne Schaltfläche der Aktivität Lookup zur Aktivität If-Bedingung.

    Verbinden von Lookup- und Copy-Aktivitäten

  4. Klicken Sie in der Symbolleiste auf Überprüfen. Vergewissern Sie sich, dass keine Validierungsfehler vorliegen. Schließen Sie das Fenster Pipeline Validation Report (Pipelineüberprüfungsbericht), indem Sie auf >> klicken.

    Schaltfläche „Überprüfen“

  5. Klicken Sie auf „Debuggen“, um die Pipeline zu testen und sich zu vergewissern, dass am Speicherort eine Datei generiert wird.

    Debuggen der inkrementellen Pipeline: 2

  6. Veröffentlichen Sie Entitäten (verknüpfte Dienste, Datasets und Pipelines) für den Data Factory-Dienst, indem Sie auf die Schaltfläche Alle veröffentlichen klicken. Warten Sie, bis die Meldung Veröffentlichung erfolgreich angezeigt wird.

    Schaltfläche

Konfigurieren Sie den Trigger für ein tumblerndes Fenster und die Parameter für das CDC-Fenster

In diesem Schritt wird ein Trigger für ein rollierendes Fenster erstellt, um den Auftrag nach einem kurz getakteten Zeitplan auszuführen. Du wirst die Systemvariablen „WindowStart“ und „WindowEnd“ des Triggers für das kippende Fenster verwenden und sie als Parameter an deine Pipeline übergeben, um sie in der CDC-Abfrage zu verwenden.

  1. Navigieren Sie zur Registerkarte Parameter der Pipeline IncrementalCopyPipeline, und fügen Sie der Pipeline mithilfe der Schaltfläche + Neu zwei Parameter (triggerStartTime und triggerEndTime) hinzu, die die Start- und Endzeit des rollierenden Fensters darstellen. Fügen Sie zu Debuggingzwecken Standardwerte im Format JJJJ-MM-TT HH24:MI:SS.FFF hinzu. Achten Sie jedoch darauf, dass die Startzeit des Triggers (triggerStartTime) nicht vor der CDC-Aktivierung für die Tabelle liegt, da sonst ein Fehler auftritt.

    Menü „Jetzt auslösen“

  2. Klicken Sie auf die Registerkarte mit den Einstellungen der Aktivität Lookup, und konfigurieren Sie die Abfrage für die Verwendung des Start- und Endparameters. Kopieren Sie Folgendes in die Abfrage:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  3. Navigieren Sie zur Aktivität Copy (im Fall „True“ der Aktivität If-Bedingung), und klicken Sie auf die Registerkarte Quelle. Kopieren Sie Folgendes in die Abfrage:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  4. Klicken Sie auf die Registerkarte Senke der Copy-Aktivität und dann auf Öffnen, um die Datensatzeigenschaften zu bearbeiten. Klicken Sie auf die Registerkarte Parameter, und fügen Sie einen neuen Parameter namens triggerStart hinzu.

    Screenshot des Hinzufügens eines neuen Parameters zur Registerkarte „Parameter“

  5. Konfigurieren Sie als Nächstes die Eigenschaften des Datensatzes, um die Daten in einem Unterverzeichnis von customers/incremental mit datumsbasierten Partitionen zu speichern.

    1. Klicken Sie in den Dataseteigenschaften auf die Registerkarte Verbindung, und fügen Sie dynamische Inhalte für die Abschnitte Verzeichnis und Datei hinzu.

    2. Geben Sie im Abschnitt Verzeichnis den folgenden Ausdruck ein, indem Sie unter dem Textfeld auf den Link für dynamischen Inhalt klicken:

      @concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
      
    3. Geben Sie im Abschnitt Datei den folgenden Ausdruck ein. Dadurch werden Dateinamen auf der Grundlage des Startdatums und der Startzeit des Triggers erstellt und mit dem Suffix „.csv“ versehen:

      @concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
      

      Konfiguration des Senkendatasets: 3

    4. Klicken Sie auf die Registerkarte IncrementalCopyPipeline, um zu den Einstellungen für die Senke in der Aktivität Copy zurückzukehren.

    5. Erweitern Sie die Dataseteigenschaften, und geben Sie im triggerStart-Parameterwert dynamischen Inhalt mit dem folgenden Ausdruck ein:

      @pipeline().parameters.triggerStartTime
      

    Konfiguration des Senkendatasets: 4

  6. Klicken Sie auf „Debuggen“, um die Pipeline zu testen und sich zu vergewissern, dass die Ordnerstruktur und die Ausgabedatei erwartungsgemäß generiert werden. Laden Sie die Datei herunter, und öffnen Sie sie, um den Inhalt zu überprüfen.

    Debuggen des inkrementellen Kopierens: 3

  7. Vergewissern Sie sich, dass die Parameter in die Abfrage eingefügt werden, indem Sie die Eingabeparameter der Pipelineausführung überprüfen.

    Debuggen des inkrementellen Kopierens: 4

  8. Veröffentlichen Sie Entitäten (verknüpfte Dienste, Datasets und Pipelines) für den Data Factory-Dienst, indem Sie auf die Schaltfläche Alle veröffentlichen klicken. Warten Sie, bis die Meldung Veröffentlichung erfolgreich angezeigt wird.

  9. Konfigurieren Sie abschließend einen Trigger für ein rollierendes Fenster, um die Pipeline in regelmäßigen Abständen auszuführen, und legen Sie Parameter für Start- und Endzeit fest.

    1. Klicken Sie auf die Schaltfläche Trigger hinzufügen, und wählen Sie Neu/Bearbeiten aus.

    Hinzufügen eines neuen Triggers

    1. Geben Sie einen Namen für den Trigger und eine Startzeit an, die der Endzeit des obigen Debugfensters entspricht.

    Trigger für ein rollierendes Fenster

    1. Geben Sie im nächsten Bildschirm die folgenden Werte für den Start- bzw. Endparameter an:

      @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
      @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
      

      Trigger für ein rollierendes Fenster: 2

Hinweis

Der Trigger wird erst ausgeführt wird, nachdem er veröffentlicht wurde. Das erwartete Verhalten des rollierenden Fensters besteht außerdem in der Ausführung aller historischen Intervalle seit dem Startdatum bis zum aktuellen Zeitpunkt. Weitere Informationen zu Triggern für ein rollierendes Fenster finden Sie hier.

  1. Mit SQL Server Management Studio nehmen Sie einige zusätzliche Änderungen an der Kundentabelle vor, indem Sie die folgende SQL-Datei ausführen:

    insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading');
    insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth');
    insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth');
    update customers set first_name='Elon' where customer_id=6;
    delete from customers where customer_id=5;
    
  2. Klicken Sie auf die Schaltfläche Alle veröffentlichen. Warten Sie, bis die Meldung Veröffentlichung erfolgreich angezeigt wird.

  3. Nach ein paar Minuten wird die Pipeline ausgelöst, und eine neue Datei wird in Azure Storage geladen worden sein.

Überwachen Sie die inkrementelle Kopierpipeline

  1. Klicken Sie links auf die Registerkarte Überwachen. Die Pipelineausführung wird in der Liste mit ihrem Status angezeigt. Klicken Sie zum Aktualisieren der Liste auf Aktualisieren. Bewegen Sie den Mauszeiger in die Nähe des Pipelinennamens, um auf die Aktion „Erneut ausführen“ und den Verbrauchsbericht zuzugreifen.

    Pipelineausführungen

  2. Klicken Sie auf den Namen der Pipeline, um die mit dem Pipeline-Durchlauf verknüpften Aktivitätsdurchläufe anzuzeigen. Wurden geänderte Daten erkannt, stehen drei Aktivitäten zur Verfügung (unter anderem die Copy-Aktivität). Andernfalls enthält die Liste nur zwei Einträge. Klicken Sie im oberen Bereich auf den Link Alle Pipelines, um zur Ansicht der Pipelineausführungen zurückzukehren.

    Aktivitätsausführungen

Überprüfen der Ergebnisse

Die zweite Datei ist im Ordner customers/incremental/YYYY/MM/DD des Containers raw enthalten.

Ausgabedatei des inkrementellen Kopiervorgangs

Im folgenden Tutorial erfahren Sie mehr über das Kopieren von neuen und geänderten Dateien nur auf Grundlage ihres LastModifiedDate-Werts: