Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Azure Data Factory
Azure Synapse Analytics
Tipp
Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.
In diesem Artikel wird eine Vorlage zum inkrementellen Laden neuer oder aktualisierter Zeilen aus einer Datenbanktabelle in Azure erläutert, bei der eine externe Steuertabelle verwendet wird, die einen hohen Grenzwert speichert.
Diese Vorlage setzt voraus, dass das Schema der Quelldatenbank eine Zeitstempelspalte oder einen Inkrementierungsschlüssel enthält, um die neuen oder aktualisierten Zeilen identifizieren zu können.
Hinweis
Wenn Sie eine Zeitstempelspalte in der Quelldatenbank haben, um neue oder aktualisierte Zeilen zu identifizieren, aber keine externe Kontrolltabelle für die Deltakopie erstellen möchten, können Sie stattdessen das Tool Azure Data Factory Copy Data verwenden, um eine Pipeline zu erstellen. Das Tool verwendet einen von einem Trigger geplanten Zeitpunkt als Variable, um neue Zeilen aus der Quelldatenbank zu lesen.
Informationen zu dieser Lösungsvorlage
Diese Vorlage ruft zuerst den alten Grenzwert ab und vergleicht ihn mit dem aktuellen Grenzwert. Nachdem die beiden Wasserzeichenwerte verglichen wurden, werden nur die Änderungen aus der Quelldatenbank kopiert. Zuletzt wird der neue hohe Grenzwert in einer externen Steuertabelle gespeichert, damit er beim nächsten Laden von Deltadaten verfügbar ist.
Die Vorlage enthält vier Aktivitäten:
- Eine Lookup-Funktion ruft den alten High-Watermark-Wert ab, der in einer externen Steuertabelle gespeichert ist.
- Eine weitere Lookup-Aktivität ruft den aktuellen High-Watermark-Wert aus der Quelldatenbank ab.
- Die Kopieraktivität kopiert nur Änderungen aus der Quelldatenbank in den Zielspeicher. Die Abfrage zum Ermitteln der Änderungen in der Quelldatenbank ähnelt „SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > ‚last high-watermark‘ und TIMESTAMP_Column <= ‚current high-watermark‘“.
- SqlServerStoredProcedure schreibt den aktuellen High-Watermark-Wert in eine externe Steuertabelle für den nächsten Deltakopiervorgang.
Die Vorlage definiert die folgenden Parameter:
- Data_Source_Table_Name entspricht der Tabelle aus der Quelldatenbank, aus der Sie Daten laden möchten.
- Data_Source_WaterMarkColumn ist der Name der Spalte in der Quelltabelle, der verwendet wird, um neue oder aktualisierte Zeilen zu identifizieren. Der Typ dieser Spalte ist üblicherweise datetime, int oder ein ähnlicher Typ.
- Data_Destination_Container entspricht dem Stammpfad des Orts, an den die Daten in Ihrem Zielspeicher kopiert werden.
- Data_Destination_Directory ist der Verzeichnispfad unter dem Stammverzeichnis des Zielspeichers, an den die Daten kopiert werden.
- Data_Destination_Table_Name ist der Ort, an den die Daten im Zielspeicher kopiert werden (gilt, wenn "Azure Synapse Analytics" als Datenziel ausgewählt wird).
- Data_Destination_Folder_Path ist der Ort, an den die Daten im Zielspeicher kopiert werden (zutreffend, wenn "Dateisystem" oder "Azure Data Lake Storage Gen1" als Datenziel ausgewählt ist).
- Control_Table_Table_Name ist die externe Steuertabelle, in der der High-Watermark-Wert gespeichert wird.
- Control_Table_Column_Name ist die Spalte in der externen Steuertabelle, in der der hohe Grenzwert gespeichert wird.
So verwenden Sie diese Lösungsvorlage
Untersuchen Sie die Quelltabelle, die Sie laden möchten, und definieren Sie die Spalte für den hohen Grenzwert, auf deren Grundlage die neuen oder aktualisierten Zeilen identifiziert werden können. Der Typ dieser Spalte kann datetime, int oder ein ähnlicher Typ sein. Der Wert dieser Spalte wird erhöht, wenn neue Zeilen hinzugefügt werden. In der folgenden Beispielquelltabelle (data_source_table) können Sie die Spalte LastModifytime als Spalte für den hohen Grenzwert verwenden.
PersonID Name LastModifytime 1 aaaa 2017-09-01 00:56:00.000 2 bbbb 2017-09-02 05:23:00.000 3 cccc 2017-09-03 02:36:00.000 4 dddd 2017-09-04 03:21:00.000 5 eeee 2017-09-05 08:06:00.000 6 fffffff 2017-09-06 02:23:00.000 7 gggg 2017-09-07 09:01:00.000 8 hhhh 2017-09-08 09:01:00.000 9 iiiiiiiii 2017-09-09 09:01:00.000Erstellen Sie eine Steuertabelle in SQL Server oder Azure SQL-Datenbank, um den High-Watermark-Wert für das Laden von Deltadaten zu speichern. Im folgenden Beispiel ist der Name der Kontrolltabelle watermarktable. In dieser Tabelle ist WatermarkValue die Spalte, die den High-Watermark-Wert speichert, und ihr Typ ist datetime.
create table watermarktable ( WatermarkValue datetime, ); INSERT INTO watermarktable VALUES ('1/1/2010 12:00:00 AM')Erstellen Sie eine gespeicherte Prozedur in derselben SQL Server oder Azure SQL-Datenbank Instanz, die Sie zum Erstellen der Steuerelementtabelle verwendet haben. Die gespeicherte Prozedur dient dazu, den neuen hohen Grenzwert in der externen Steuertabelle zu speichern, damit er beim nächsten Laden von Deltadaten zur Verfügung steht.
CREATE PROCEDURE update_watermark @LastModifiedtime datetime AS BEGIN UPDATE watermarktable SET [WatermarkValue] = @LastModifiedtime ENDWechseln Sie zur Vorlage Delta copy from Database (Deltakopie aus einer Datenbank). Stellen Sie eine neue Verbindung mit der Quelldatenbank her, aus der Sie Daten kopieren möchten.
Stellen Sie eine neue Verbindung mit dem Zieldatenspeicher her, in den Sie die Daten kopieren möchten.
Stellen Sie eine neue Verbindung mit der externen Steuertabelle und der gespeicherten Prozedur her, die Sie in Schritt 2 und Schritt 3 erstellt haben.
Klicken Sie auf Diese Vorlage verwenden.
Sie sehen die verfügbare Pipeline, wie im folgenden Beispiel gezeigt:
Klicken Sie auf Gespeicherte Prozedur. Wählen Sie [dbo].[update_watermark] für Name der gespeicherten Prozedur aus. Klicken Sie auf Import parameter (Importparameter), und wählen Sie Dynamischen Inhalt hinzufügen aus.
Schreiben Sie den Inhalt {activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue}, und klicken Sie dann auf Fertig stellen.
Klicken Sie auf Debuggen, geben Sie die Parameter ein, und klicken Sie dann auf Fertig stellen.
Die angezeigten Ergebnisse entsprechen etwa folgendem Beispiel:
Sie können neue Zeilen in Ihrer Quelltabelle erstellen. Hier sehen Sie ein SQL-Beispiel für die Erstellung neuer Zeilen:
INSERT INTO data_source_table VALUES (10, 'newdata','9/10/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (11, 'newdata','9/11/2017 9:01:00 AM')Klicken Sie auf Debuggen, geben Sie die Parameter ein, und klicken Sie dann auf Fertig stellen, um die Pipeline erneut auszuführen.
Sie werden feststellen, dass nur neue Zeilen in das Ziel kopiert wurden.
(Optional:) Wenn Sie Azure Synapse Analytics als Datenziel auswählen, müssen Sie auch eine Verbindung mit Azure Blob-Speicher für Staging bereitstellen, die von Azure Synapse Analytics Polybase erforderlich ist. Die Vorlage generiert einen Containerpfad für Sie. Überprüfen Sie nach Ausführung der Pipeline, ob der Container im Blobspeicher erstellt wurde.