Freigeben über


Inkrementelles Laden von Daten aus Azure SQL-Datenbank in Azure Blob-Speicher mithilfe von PowerShell

Gilt für: Azure Data Factory Azure Synapse Analytics

Tipp

Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.

In diesem Tutorial verwenden Sie Azure Data Factory, um eine Pipeline zu erstellen, die Deltadaten aus einer Tabelle in Azure SQL-Datenbank in den Azure Blob-Speicher lädt.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Bereiten Sie den Datenspeicher zum Speichern des Wasserzeichenswertes vor.
  • Erstellen einer Data Factory.
  • Erstellen Sie verknüpfte Dienste.
  • Erstellen Sie ein Quell-, Senken- und Grenzwertdataset.
  • Erstellen einer Pipeline.
  • Ausführen der Pipeline.
  • Überwachen der Pipelineausführung.

Übersicht

Allgemeines Lösungsdiagramm:

Lädt Daten inkrementell

Hier sind die wesentlichen Schritte beim Erstellen dieser Lösung aufgeführt:

  1. Select the watermark column (Wählen Sie die Wasserzeichen-Spalte aus) Wählen Sie eine Spalte im Quelldatenspeicher aus, die verwendet werden kann, um die neuen oder aktualisierten Datensätze für jede Ausführung in Segmente aufzuteilen. Normalerweise steigen die Daten in dieser ausgewählten Spalte (z.B. Last_modify_time oder ID), wenn Zeilen erstellt oder aktualisiert werden. Der maximale Wert in dieser Spalte wird als Grenzwert verwendet.

  2. Bereiten Sie einen Datenspeicher vor, um den Wasserzeichenwert zu speichern.
    In diesem Tutorial speichern Sie den Watermark-Wert in einer SQL-Datenbank.

  3. Erstellen einer Pipeline mit dem folgenden Workflow:

    Die Pipeline in dieser Lösung enthält die folgenden Aktivitäten:

    • Erstellen Sie zwei Lookup-Aktivitäten. Verwenden Sie die erste Lookup-Aktivität, um den letzten Wasserzeichenwert abzurufen. Verwenden Sie die zweite Lookup-Aktivität, um den neuen Wasserzeichenwert abzurufen. Diese Wasserzeichenwerte werden an die Copy-Aktivität übergeben.
    • Erstellen Sie eine Kopieraktivität, die Zeilen aus dem Quelldatenspeicher kopiert, bei denen der Wert der Wasserzeichenspalte größer als der alte Wasserzeichenwert und kleiner oder gleich dem neuen Wasserzeichenwert ist. Anschließend werden die Deltadaten aus dem Quelldatenspeicher als neue Datei in einen Blobspeicher kopiert.
    • Erstellen Sie eine StoredProcedure-Aktivität, die den Grenzwert für die Pipeline aktualisiert, die nächstes Mal ausgeführt wird.

Wenn Sie nicht über ein Azure-Abonnement verfügen, erstellen Sie ein free Konto, bevor Sie beginnen.

Voraussetzungen

Hinweis

Es wird empfohlen, das Azure Az PowerShell-Modul für die Interaktion mit Azure zu verwenden. Informationen zu den ersten Schritten finden Sie unter Install Azure PowerShell. Informationen zum Migrieren zum Az PowerShell-Modul finden Sie unter Migrate Azure PowerShell von AzureRM zu Az.

Erstellen einer Datenquelltabelle in Ihrer SQL-Datenbank

  1. Öffnen Sie SQL Server Management Studio. Klicken Sie im Server-Explorer mit der rechten Maustaste auf die Datenbank, und wählen Sie Neue Abfrage.

  2. Führen Sie den folgenden SQL-Befehl für Ihre SQL-Datenbank aus, um eine Tabelle mit dem Namen data_source_table als Quelldatenspeicher zu erstellen.

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    In diesem Tutorial verwenden Sie „LastModifytime“ als die Wasserzeichenspalte. Die Daten im Quelldatenspeicher sind in der folgenden Tabelle dargestellt:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Erstellen Sie eine weitere Tabelle in Ihrer SQL-Datenbank, um den High Watermark-Wert zu speichern.

  1. Führen Sie den folgenden SQL-Befehl für Ihre SQL-Datenbank aus, um eine Tabelle mit dem Namen watermarktable zum Speichern des Grenzwerts zu erstellen:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Legen Sie den Standardwert für den hohen Grenzwert mit dem Tabellennamen des Quelldatenspeichers fest. In diesem Tutorial lautet der Tabellenname „data_source_table“.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Überprüfen Sie die Daten in der Tabelle watermarktable.

    Select * from watermarktable
    

    Ausgabe:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Erstellen einer gespeicherten Prozedur in Ihrer SQL-Datenbank

Führen Sie den folgenden Befehl zum Erstellen einer gespeicherten Prozedur in Ihrer SQL-Datenbank aus.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Erstellen einer Data Factory

  1. Definieren Sie eine Variable für den Ressourcengruppennamen zur späteren Verwendung in PowerShell-Befehlen. Kopieren Sie den folgenden Befehlstext in PowerShell, geben Sie einen Namen für die ressourcengruppe Azure in doppelte Anführungszeichen an, und führen Sie dann den Befehl aus. z. B. "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Beachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen $resourceGroupName einen anderen Wert zu, und führen Sie den Befehl erneut aus.

  2. Definieren Sie eine Variable für den Speicherort der Data Factory.

    $location = "East US"
    
  3. Führen Sie den folgenden Befehl aus, um die ressourcengruppe Azure zu erstellen:

    New-AzResourceGroup $resourceGroupName $location
    

    Beachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen $resourceGroupName einen anderen Wert zu, und führen Sie den Befehl erneut aus.

  4. Definieren Sie eine Variable für den Namen der Data Factory.

    Wichtig

    Aktualisieren Sie den Data Factory-Namen, damit er global eindeutig ist. Ein Beispiel hierfür ist ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Führen Sie zum Erstellen der Data Factory das Cmdlet Set-AzDataFactoryV2 wie folgt aus:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Beachten Sie folgende Punkte:

  • Der Name der Data Factory muss global eindeutig sein. Wenn die folgende Fehlermeldung angezeigt wird, ändern Sie den Namen, und wiederholen Sie den Vorgang:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Zum Erstellen von Data Factory-Instanzen muss das Benutzerkonto, mit dem Sie sich bei Azure anmelden, Mitglied von Mitwirkenden- oder Besitzerrollen oder ein Administrator des Azure-Abonnements sein.

  • Wählen Sie für eine Liste Azure Regionen, in denen Data Factory zurzeit verfügbar ist, die Regionen aus, die Sie auf der folgenden Seite interessieren, und erweitern Sie dann Analytics, um Data Factory: Products available by region zu finden. Die Datenspeicher (Speicher, SQL-Datenbank, Azure SQL Managed Instance usw.) und Berechnungen (Azure HDInsight usw.), die von der Datenfactory verwendet werden, können sich in anderen Regionen befinden.

Erstellen von verknüpften Diensten

Um Ihre Datenspeicher und Compute Services mit der Data Factory zu verknüpfen, können Sie verknüpfte Dienste in einer Data Factory erstellen. In diesem Abschnitt erstellen Sie verknüpfte Dienste für Ihr Speicherkonto und SQL-Datenbank.

Erstellen eines verknüpften Speicherdiensts

  1. Erstellen Sie im Ordner „C:\ADF“ eine JSON-Datei mit dem Namen „AzureStorageLinkedService.json“ und folgendem Inhalt. (Erstellen Sie den Ordner „ADF“, wenn er noch nicht vorhanden ist.) Ersetzen Sie <accountName> und <accountKey> durch den Namen und den Schlüssel Ihres Speicherkontos, bevor Sie die Datei speichern.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. Wechseln Sie in PowerShell zum Ordner „ADF“.

  3. Führen Sie das Cmdlet Set-AzDataFactoryV2LinkedService aus, um den verknüpften Dienst „AzureStorageLinkedService“ zu erstellen. Im folgenden Beispiel übergeben Sie Werte für die ResourceGroupName- und DataFactoryName-Parameter:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Hier ist die Beispielausgabe:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Erstellen eines mit SQL-Datenbank verknüpften Diensts

  1. Erstellen Sie im Ordner „C:\ADF“ eine JSON-Datei mit dem Namen „AzureSQLDatabaseLinkedService.json“ und folgendem Inhalt. (Erstellen Sie den Ordner „ADF“, wenn er noch nicht vorhanden ist.) Ersetzen Sie <your-server-name> und <your-database-name> durch den Namen des Servers und der Datenbank, bevor Sie die Datei speichern. Außerdem müssen Sie Ihren Azure SQL Server so konfigurieren, dass der Zugriff auf die verwaltete Identität Ihrer Datenfactory gewährt wird.

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. Wechseln Sie in PowerShell zum Ordner „ADF“.

  3. Führen Sie das Set-AzDataFactoryV2LinkedService-Cmdlet aus, um den verknüpften Dienst „AzureSQLDatabaseLinkedService“ zu erstellen.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Hier ist die Beispielausgabe:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Erstellen von Datasets

In diesem Schritt erstellen Sie Datasets zur Darstellung von Quell- und Senkendaten.

Erstellen eines Quelldatasets

  1. Erstellen Sie eine JSON-Datei mit dem Namen „SourceDataset.json“ im selben Ordner und dem folgenden Inhalt:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    In diesem Tutorial verwenden Sie den Tabellennamen „data_source_table“. Ersetzen Sie ihn, wenn Sie eine Tabelle mit einem anderen Namen verwenden.

  2. Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „SourceDataset“ zu erstellen.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Erstellen Sie ein Senkendataset

  1. Erstellen Sie eine JSON-Datei mit dem Namen „SinkDataset.json“ im selben Ordner und dem folgenden Inhalt:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Wichtig

    In diesem Codeausschnitt wird davon ausgegangen, dass Ihr Blobspeicher einen Blobcontainer mit dem Namen adftutorial enthält. Erstellen Sie den Container, wenn er noch nicht vorhanden ist, oder geben Sie den Namen eines bereits vorhandenen ein. Der Ausgabeordner incrementalcopy wird automatisch erstellt, wenn er nicht bereits im Container vorhanden ist. In diesem Tutorial wird der Dateiname dynamisch mit dem Ausdruck @CONCAT('Incremental-', pipeline().RunId, '.txt') generiert.

  2. Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „SinkDataset“ zu erstellen.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Erstellen eines Datensatzes für ein Wasserzeichen

In diesem Schritt erstellen Sie ein Dataset zum Speichern eines hohen Grenzwerts.

  1. Erstellen Sie eine JSON-Datei mit dem Namen „WatermarkDataset.json“ im selben Ordner und dem folgenden Inhalt:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Führen Sie das Cmdlet Set-AzDataFactoryV2Dataset aus, um das Dataset „WatermarkDataset“ zu erstellen.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Erstellen einer Pipeline

In diesem Tutorial erstellen Sie eine Pipeline mit zwei Lookup-Aktivitäten, einer Kopieraktivität und einer StoredProcedure-Aktivität, die in einer Pipeline verkettet sind.

  1. Erstellen Sie in demselben Ordner die JSON-Datei „IncrementalCopyPipeline.json“ mit folgendem Inhalt:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Führen Sie das Cmdlet Set-AzDataFactoryV2Pipeline aus, um die Pipeline „IncrementalCopyPipeline“ zu erstellen.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Hier ist die Beispielausgabe:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

Führen Sie die Pipeline aus.

  1. Führen Sie mithilfe des Cmdlets Invoke-AzDataFactoryV2Pipeline die Pipeline „IncrementalCopyPipeline“ aus. Ersetzen Sie die Platzhalter durch Ihren eigenen Namen für die Ressourcengruppe und die Data Factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Überprüfen Sie den Status der Pipeline durch Ausführen des Get-AzDataFactoryV2ActivityRun-Cmdlets, bis Sie sehen, dass alle Aktivitäten erfolgreich ausgeführt wurden. Ersetzen Sie Platzhalter durch Ihre eigene geeignete Zeit für die Parameter RunStartedAfter und RunStartedBefore. In diesem Tutorial verwenden Sie Folgendes: -RunStartedAfter "2017/09/14" und -RunStartedBefore "2017/09/15" .

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Hier ist die Beispielausgabe:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

Überprüfen der Ergebnisse

  1. Im Blobspeicher (Senkenspeicher) sehen Sie, dass die Daten in die im „SinkDataset“ definierte Datei kopiert wurden. Im aktuellen Tutorial ist der Dateiname Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Öffnen Sie die Datei, und Sie können Datensätze in der Datei sehen, die mit den Daten in der SQL-Datenbank identisch sind.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Überprüfen Sie den aktuellen Wert aus watermarktable. Sie sehen, dass der Wasserzeichenwert aktualisiert wurde.

    Select * from watermarktable
    

    Hier ist die Beispielausgabe:

    TableName WatermarkValue
    „Data_source_table“ 2017-09-05 8:06:00.000

Einfügen von Daten in den Datenquellspeicher, um das Laden von Deltadaten zu überprüfen

  1. Fügen Sie neue Daten in die SQL-Datenbank (Datenquellenspeicher) ein.

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    Die aktualisierten Daten in der SQL-Datenbank lauten:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Führen Sie die Pipeline „IncrementalCopyPipeline“ erneut mithilfe des Invoke-AzDataFactoryV2Pipeline-Cmdlets aus. Ersetzen Sie die Platzhalter durch Ihren eigenen Namen für die Ressourcengruppe und die Data Factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Überprüfen Sie den Status der Pipeline durch Ausführen des Get-AzDataFactoryV2ActivityRun-Cmdlets, bis Sie sehen, dass alle Aktivitäten erfolgreich ausgeführt wurden. Ersetzen Sie Platzhalter durch Ihre eigene geeignete Zeit für die Parameter RunStartedAfter und RunStartedBefore. In diesem Tutorial verwenden Sie Folgendes: -RunStartedAfter "2017/09/14" und -RunStartedBefore "2017/09/15" .

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Hier ist die Beispielausgabe:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. Im Blobspeicher sehen Sie, dass eine weitere Datei erstellt wurde. In diesem Tutorial ist der neue Dateiname Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Wenn Sie diese Datei öffnen, sehen Sie zwei Zeilen mit Datensätzen.

  5. Überprüfen Sie den aktuellen Wert aus watermarktable. Sie sehen, dass der Wasserzeichenwert erneut aktualisiert wurde.

    Select * from watermarktable
    

    Beispielausgabe:

    TableName WatermarkValue
    „Data_source_table“ 2017-09-07 09:01:00.000

In diesem Tutorial haben Sie die folgenden Schritte ausgeführt:

  • Bereiten Sie den Datenspeicher zum Speichern des Wasserzeichenswertes vor.
  • Erstellen einer Data Factory.
  • Erstellen Sie verknüpfte Dienste.
  • Erstellen Sie ein Quell-, Senken- und Grenzwertdataset.
  • Erstellen einer Pipeline.
  • Ausführen der Pipeline.
  • Überwachen der Pipelineausführung.

In dieser Anleitung kopierte die Pipeline Daten aus einer einzelnen Tabelle in der Azure SQL-Datenbank in den Blob-Speicher. Wechseln Sie zum folgenden Tutorial, um zu erfahren, wie Sie Daten aus mehreren Tabellen in einer SQL Server-Datenbank in eine SQL-Datenbank kopieren.