Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Azure Data Factory
Azure Synapse Analytics
Tipp
Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.
In diesem Tutorial verwenden Sie Azure Data Factory, um eine Pipeline zu erstellen, die Deltadaten aus einer Tabelle in Azure SQL-Datenbank in den Azure Blob-Speicher lädt.
In diesem Tutorial führen Sie die folgenden Schritte aus:
- Bereiten Sie den Datenspeicher zum Speichern des Wasserzeichenswertes vor.
- Erstellen einer Data Factory.
- Erstellen Sie verknüpfte Dienste.
- Erstellen Sie ein Quell-, Senken- und Grenzwertdataset.
- Erstellen einer Pipeline.
- Ausführen der Pipeline.
- Überwachen der Pipelineausführung.
Übersicht
Allgemeines Lösungsdiagramm:
Hier sind die wesentlichen Schritte beim Erstellen dieser Lösung aufgeführt:
Select the watermark column (Wählen Sie die Wasserzeichen-Spalte aus) Wählen Sie eine Spalte im Quelldatenspeicher aus, die verwendet werden kann, um die neuen oder aktualisierten Datensätze für jede Ausführung in Segmente aufzuteilen. Normalerweise steigen die Daten in dieser ausgewählten Spalte (z.B. Last_modify_time oder ID), wenn Zeilen erstellt oder aktualisiert werden. Der maximale Wert in dieser Spalte wird als Grenzwert verwendet.
Bereiten Sie einen Datenspeicher vor, um den Wasserzeichenwert zu speichern.
In diesem Tutorial speichern Sie den Watermark-Wert in einer SQL-Datenbank.Erstellen einer Pipeline mit dem folgenden Workflow:
Die Pipeline in dieser Lösung enthält die folgenden Aktivitäten:
- Erstellen Sie zwei Lookup-Aktivitäten. Verwenden Sie die erste Lookup-Aktivität, um den letzten Wasserzeichenwert abzurufen. Verwenden Sie die zweite Lookup-Aktivität, um den neuen Wasserzeichenwert abzurufen. Diese Wasserzeichenwerte werden an die Copy-Aktivität übergeben.
- Erstellen Sie eine Kopieraktivität, die Zeilen aus dem Quelldatenspeicher kopiert, bei denen der Wert der Wasserzeichenspalte größer als der alte Wasserzeichenwert und kleiner oder gleich dem neuen Wasserzeichenwert ist. Anschließend werden die Deltadaten aus dem Quelldatenspeicher als neue Datei in einen Blobspeicher kopiert.
- Erstellen Sie eine StoredProcedure-Aktivität, die den Grenzwert für die Pipeline aktualisiert, die nächstes Mal ausgeführt wird.
Wenn Sie nicht über ein Azure-Abonnement verfügen, erstellen Sie ein free Konto, bevor Sie beginnen.
Voraussetzungen
Hinweis
Es wird empfohlen, das Azure Az PowerShell-Modul für die Interaktion mit Azure zu verwenden. Informationen zu den ersten Schritten finden Sie unter Install Azure PowerShell. Informationen zum Migrieren zum Az PowerShell-Modul finden Sie unter Migrate Azure PowerShell von AzureRM zu Az.
- Azure SQL-Datenbank. Sie verwenden die Datenbank als den Quelldatenspeicher. Wenn Sie keine Datenbank in Azure SQL-Datenbank haben, lesen Sie Erstellen einer Datenbank in Azure SQL-Datenbank, um die Schritte zum Erstellen einer Datenbank zu erfahren.
- Azure Storage. Sie verwenden den Blobspeicher als Senkendatenspeicher. Wenn Sie kein Speicherkonto besitzen, finden Sie unter Erstellen eines Speicherkontos Schritte zum Erstellen eines solchen Kontos. Erstellen Sie einen Container mit dem Namen „adftutorial“.
- Azure PowerShell. Befolgen Sie die Anweisungen in Install und konfigurieren Sie Azure PowerShell.
Erstellen einer Datenquelltabelle in Ihrer SQL-Datenbank
Öffnen Sie SQL Server Management Studio. Klicken Sie im Server-Explorer mit der rechten Maustaste auf die Datenbank, und wählen Sie Neue Abfrage.
Führen Sie den folgenden SQL-Befehl für Ihre SQL-Datenbank aus, um eine Tabelle mit dem Namen
data_source_tableals Quelldatenspeicher zu erstellen.create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');In diesem Tutorial verwenden Sie „LastModifytime“ als die Wasserzeichenspalte. Die Daten im Quelldatenspeicher sind in der folgenden Tabelle dargestellt:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Erstellen Sie eine weitere Tabelle in Ihrer SQL-Datenbank, um den High Watermark-Wert zu speichern.
Führen Sie den folgenden SQL-Befehl für Ihre SQL-Datenbank aus, um eine Tabelle mit dem Namen
watermarktablezum Speichern des Grenzwerts zu erstellen:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );Legen Sie den Standardwert für den hohen Grenzwert mit dem Tabellennamen des Quelldatenspeichers fest. In diesem Tutorial lautet der Tabellenname „data_source_table“.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Überprüfen Sie die Daten in der Tabelle
watermarktable.Select * from watermarktableAusgabe:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Erstellen einer gespeicherten Prozedur in Ihrer SQL-Datenbank
Führen Sie den folgenden Befehl zum Erstellen einer gespeicherten Prozedur in Ihrer SQL-Datenbank aus.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Erstellen einer Data Factory
Definieren Sie eine Variable für den Ressourcengruppennamen zur späteren Verwendung in PowerShell-Befehlen. Kopieren Sie den folgenden Befehlstext in PowerShell, geben Sie einen Namen für die ressourcengruppe Azure in doppelte Anführungszeichen an, und führen Sie dann den Befehl aus. z. B.
"adfrg".$resourceGroupName = "ADFTutorialResourceGroup";Beachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen
$resourceGroupNameeinen anderen Wert zu, und führen Sie den Befehl erneut aus.Definieren Sie eine Variable für den Speicherort der Data Factory.
$location = "East US"Führen Sie den folgenden Befehl aus, um die ressourcengruppe Azure zu erstellen:
New-AzResourceGroup $resourceGroupName $locationBeachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen
$resourceGroupNameeinen anderen Wert zu, und führen Sie den Befehl erneut aus.Definieren Sie eine Variable für den Namen der Data Factory.
Wichtig
Aktualisieren Sie den Data Factory-Namen, damit er global eindeutig ist. Ein Beispiel hierfür ist ADFTutorialFactorySP1127.
$dataFactoryName = "ADFIncCopyTutorialFactory";Führen Sie zum Erstellen der Data Factory das Cmdlet Set-AzDataFactoryV2 wie folgt aus:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
Beachten Sie folgende Punkte:
Der Name der Data Factory muss global eindeutig sein. Wenn die folgende Fehlermeldung angezeigt wird, ändern Sie den Namen, und wiederholen Sie den Vorgang:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.Zum Erstellen von Data Factory-Instanzen muss das Benutzerkonto, mit dem Sie sich bei Azure anmelden, Mitglied von Mitwirkenden- oder Besitzerrollen oder ein Administrator des Azure-Abonnements sein.
Wählen Sie für eine Liste Azure Regionen, in denen Data Factory zurzeit verfügbar ist, die Regionen aus, die Sie auf der folgenden Seite interessieren, und erweitern Sie dann Analytics, um Data Factory: Products available by region zu finden. Die Datenspeicher (Speicher, SQL-Datenbank, Azure SQL Managed Instance usw.) und Berechnungen (Azure HDInsight usw.), die von der Datenfactory verwendet werden, können sich in anderen Regionen befinden.
Erstellen von verknüpften Diensten
Um Ihre Datenspeicher und Compute Services mit der Data Factory zu verknüpfen, können Sie verknüpfte Dienste in einer Data Factory erstellen. In diesem Abschnitt erstellen Sie verknüpfte Dienste für Ihr Speicherkonto und SQL-Datenbank.
Erstellen eines verknüpften Speicherdiensts
Erstellen Sie im Ordner „C:\ADF“ eine JSON-Datei mit dem Namen „AzureStorageLinkedService.json“ und folgendem Inhalt. (Erstellen Sie den Ordner „ADF“, wenn er noch nicht vorhanden ist.) Ersetzen Sie
<accountName>und<accountKey>durch den Namen und den Schlüssel Ihres Speicherkontos, bevor Sie die Datei speichern.{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }Wechseln Sie in PowerShell zum Ordner „ADF“.
Führen Sie das Cmdlet Set-AzDataFactoryV2LinkedService aus, um den verknüpften Dienst „AzureStorageLinkedService“ zu erstellen. Im folgenden Beispiel übergeben Sie Werte für die ResourceGroupName- und DataFactoryName-Parameter:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"Hier ist die Beispielausgabe:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
Erstellen eines mit SQL-Datenbank verknüpften Diensts
Erstellen Sie im Ordner „C:\ADF“ eine JSON-Datei mit dem Namen „AzureSQLDatabaseLinkedService.json“ und folgendem Inhalt. (Erstellen Sie den Ordner „ADF“, wenn er noch nicht vorhanden ist.) Ersetzen Sie <your-server-name> und <your-database-name> durch den Namen des Servers und der Datenbank, bevor Sie die Datei speichern. Außerdem müssen Sie Ihren Azure SQL Server so konfigurieren, dass der Zugriff auf die verwaltete Identität Ihrer Datenfactory gewährt wird
. { "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }Wechseln Sie in PowerShell zum Ordner „ADF“.
Führen Sie das Set-AzDataFactoryV2LinkedService-Cmdlet aus, um den verknüpften Dienst „AzureSQLDatabaseLinkedService“ zu erstellen.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"Hier ist die Beispielausgabe:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
Erstellen von Datasets
In diesem Schritt erstellen Sie Datasets zur Darstellung von Quell- und Senkendaten.
Erstellen eines Quelldatasets
Erstellen Sie eine JSON-Datei mit dem Namen „SourceDataset.json“ im selben Ordner und dem folgenden Inhalt:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }In diesem Tutorial verwenden Sie den Tabellennamen „data_source_table“. Ersetzen Sie ihn, wenn Sie eine Tabelle mit einem anderen Namen verwenden.
Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „SourceDataset“ zu erstellen.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"Hier ist die Beispielausgabe des Cmdlets:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Erstellen Sie ein Senkendataset
Erstellen Sie eine JSON-Datei mit dem Namen „SinkDataset.json“ im selben Ordner und dem folgenden Inhalt:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }Wichtig
In diesem Codeausschnitt wird davon ausgegangen, dass Ihr Blobspeicher einen Blobcontainer mit dem Namen
adftutorialenthält. Erstellen Sie den Container, wenn er noch nicht vorhanden ist, oder geben Sie den Namen eines bereits vorhandenen ein. Der Ausgabeordnerincrementalcopywird automatisch erstellt, wenn er nicht bereits im Container vorhanden ist. In diesem Tutorial wird der Dateiname dynamisch mit dem Ausdruck@CONCAT('Incremental-', pipeline().RunId, '.txt')generiert.Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „SinkDataset“ zu erstellen.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"Hier ist die Beispielausgabe des Cmdlets:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
Erstellen eines Datensatzes für ein Wasserzeichen
In diesem Schritt erstellen Sie ein Dataset zum Speichern eines hohen Grenzwerts.
Erstellen Sie eine JSON-Datei mit dem Namen „WatermarkDataset.json“ im selben Ordner und dem folgenden Inhalt:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „WatermarkDataset“ zu erstellen.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"Hier ist die Beispielausgabe des Cmdlets:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Erstellen einer Pipeline
In diesem Tutorial erstellen Sie eine Pipeline mit zwei Lookup-Aktivitäten, einer Kopieraktivität und einer StoredProcedure-Aktivität, die in einer Pipeline verkettet sind.
Erstellen Sie in demselben Ordner die JSON-Datei „IncrementalCopyPipeline.json“ mit folgendem Inhalt:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }Führen Sie das Cmdlet Set-AzDataFactoryV2Pipeline aus, um die Pipeline „IncrementalCopyPipeline“ zu erstellen.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"Hier ist die Beispielausgabe:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
Führen Sie die Pipeline aus.
Führen Sie mithilfe des Cmdlets Invoke-AzDataFactoryV2Pipeline die Pipeline „IncrementalCopyPipeline“ aus. Ersetzen Sie die Platzhalter durch Ihren eigenen Namen für die Ressourcengruppe und die Data Factory.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryNameÜberprüfen Sie den Status der Pipeline durch Ausführen des Get-AzDataFactoryV2ActivityRun-Cmdlets, bis Sie sehen, dass alle Aktivitäten erfolgreich ausgeführt wurden. Ersetzen Sie Platzhalter durch Ihre eigene geeignete Zeit für die Parameter RunStartedAfter und RunStartedBefore. In diesem Tutorial verwenden Sie Folgendes: -RunStartedAfter "2017/09/14" und -RunStartedBefore "2017/09/15" .
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"Hier ist die Beispielausgabe:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
Überprüfen der Ergebnisse
Im Blobspeicher (Senkenspeicher) sehen Sie, dass die Daten in die im „SinkDataset“ definierte Datei kopiert wurden. Im aktuellen Tutorial ist der Dateiname
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Öffnen Sie die Datei, und Sie können Datensätze in der Datei sehen, die mit den Daten in der SQL-Datenbank identisch sind.1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000Überprüfen Sie den aktuellen Wert aus
watermarktable. Sie sehen, dass der Wasserzeichenwert aktualisiert wurde.Select * from watermarktableHier ist die Beispielausgabe:
TableName WatermarkValue „Data_source_table“ 2017-09-05 8:06:00.000
Einfügen von Daten in den Datenquellspeicher, um das Laden von Deltadaten zu überprüfen
Fügen Sie neue Daten in die SQL-Datenbank (Datenquellenspeicher) ein.
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')Die aktualisierten Daten in der SQL-Datenbank lauten:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000Führen Sie die Pipeline „IncrementalCopyPipeline“ erneut mithilfe des Invoke-AzDataFactoryV2Pipeline-Cmdlets aus. Ersetzen Sie die Platzhalter durch Ihren eigenen Namen für die Ressourcengruppe und die Data Factory.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryNameÜberprüfen Sie den Status der Pipeline durch Ausführen des Get-AzDataFactoryV2ActivityRun-Cmdlets, bis Sie sehen, dass alle Aktivitäten erfolgreich ausgeführt wurden. Ersetzen Sie Platzhalter durch Ihre eigene geeignete Zeit für die Parameter RunStartedAfter und RunStartedBefore. In diesem Tutorial verwenden Sie Folgendes: -RunStartedAfter "2017/09/14" und -RunStartedBefore "2017/09/15" .
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"Hier ist die Beispielausgabe:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}Im Blobspeicher sehen Sie, dass eine weitere Datei erstellt wurde. In diesem Tutorial ist der neue Dateiname
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Wenn Sie diese Datei öffnen, sehen Sie zwei Zeilen mit Datensätzen.Überprüfen Sie den aktuellen Wert aus
watermarktable. Sie sehen, dass der Wasserzeichenwert erneut aktualisiert wurde.Select * from watermarktableBeispielausgabe:
TableName WatermarkValue „Data_source_table“ 2017-09-07 09:01:00.000
Zugehöriger Inhalt
In diesem Tutorial haben Sie die folgenden Schritte ausgeführt:
- Bereiten Sie den Datenspeicher zum Speichern des Wasserzeichenswertes vor.
- Erstellen einer Data Factory.
- Erstellen Sie verknüpfte Dienste.
- Erstellen Sie ein Quell-, Senken- und Grenzwertdataset.
- Erstellen einer Pipeline.
- Ausführen der Pipeline.
- Überwachen der Pipelineausführung.
In dieser Anleitung kopierte die Pipeline Daten aus einer einzelnen Tabelle in der Azure SQL-Datenbank in den Blob-Speicher. Wechseln Sie zum folgenden Tutorial, um zu erfahren, wie Sie Daten aus mehreren Tabellen in einer SQL Server-Datenbank in eine SQL-Datenbank kopieren.