Condividi tramite


Caricare dati incrementali da database SQL di Azure ad Archiviazione BLOB di Azure usando PowerShell

APPLICABILE A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Data Factory in Microsoft Fabric è la nuova generazione di Azure Data Factory, con un'architettura più semplice, un'intelligenza artificiale predefinita e nuove funzionalità. Se non si ha familiarità con l'integrazione dei dati, iniziare con Fabric Data Factory. I carichi di lavoro di Azure Data Factory esistenti possono eseguire l'aggiornamento a Fabric per accedere a nuove funzionalità tra data science, analisi in tempo reale e creazione di report.

In questa esercitazione si usa Azure Data Factory per creare una pipeline che carica dati delta da una tabella in database SQL di Azure nell'archivio BLOB di Azure.

In questa esercitazione vengono completati i passaggi seguenti:

  • Preparare l'archivio dati per l'archiviazione del valore di watermark.
  • Creare una fabbrica di dati.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare una pipeline.
  • Esegui la pipeline.
  • Monitorare l'esecuzione della pipeline.

Panoramica

Il diagramma generale della soluzione è il seguente:

Caricare i dati in modo incrementale

Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:

  1. Selezionare la colonna del watermark. Selezionare una colonna nell'archivio dati di origine, che può essere usata per suddividere i record nuovi o aggiornati ad ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.

  2. Preparare un archivio dati per l'archiviazione del valore limite.
    In questa esercitazione si archivia il valore di filigrana in un database SQL.

  3. Creare una pipeline con il flusso di lavoro seguente:

    La pipeline di questa soluzione contiene le attività seguenti:

    • Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori di watermark vengono passati all'attività di copia.
    • Creare un attività Copy che copia le righe dall'archivio dati di origine con il valore della colonna filigrana maggiore del valore limite precedente e minore o uguale al nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB come nuovo file.
    • Creare un'attività StoredProcedure che aggiorni il valore di riferimento per la pipeline che verrà eseguita alla prossima esecuzione.

Se non si ha una sottoscrizione Azure, creare un account free prima di iniziare.

Prerequisiti

Nota

È consigliabile usare il modulo Az PowerShell Azure per interagire con Azure. Per iniziare, vedere Installare Azure PowerShell. Per informazioni su come eseguire la migrazione al modulo Az PowerShell, vedere Migrate Azure PowerShell da AzureRM ad Az.

Creare una tabella di origine dati nel database SQL

  1. Aprire SQL Server Management Studio. In Esplora server fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.

  2. Eseguire questo comando SQL sul database SQL per creare una tabella denominata data_source_table come archivio dell'origine dati:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    In questa esercitazione, si usa LastModifytime come colonna watermark. I dati nell'archivio dell'origine dati sono visualizzati nella tabella seguente:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Creare un'altra tabella nel database SQL per archiviare il valore del limite massimo

  1. Eseguire questo comando SQL sul database SQL per creare una tabella denominata watermarktable in cui archiviare il valore limite:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Definire il valore predefinito del watermark con il nome della tabella dell'archivio dati di origine. In questa esercitazione, il nome della tabella è data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Esaminare i dati nella tabella watermarktable.

    Select * from watermarktable
    

    Output:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Creare una procedura memorizzata nel database SQL

Esegui il seguente comando per creare una stored procedure nel database SQL:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Creare una data factory

  1. Definire una variabile per il nome del gruppo di risorse usato in seguito nei comandi di PowerShell. Copiare il testo del comando seguente in PowerShell, specificare un nome per il gruppo di risorse Azure tra virgolette doppie e quindi eseguire il comando. Un esempio è "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile $resourceGroupName ed eseguire di nuovo il comando.

  2. Definire una variabile per la località della data factory.

    $location = "East US"
    
  3. Per creare il gruppo di risorse Azure, eseguire il comando seguente:

    New-AzResourceGroup $resourceGroupName $location
    

    Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile $resourceGroupName ed eseguire di nuovo il comando.

  4. Definire una variabile per il nome della data factory.

    Importante

    Aggiornare il nome della data factory in modo che sia univoco a livello globale, ad esempio ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Per creare la data factory, eseguire questo cmdlet Set-AzDataFactoryV2:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Notare i punti seguenti:

  • Il nome della data factory deve essere univoco a livello globale. Se viene visualizzato l'errore seguente, modificare il nome e riprovare:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Per creare istanze di Data Factory, l'account utente usato per accedere a Azure deve essere membro dei ruoli collaboratore o proprietario o amministratore della sottoscrizione Azure.

  • Per un elenco di aree Azure in cui Data Factory è attualmente disponibile, selezionare le aree a cui si è interessati nella pagina seguente e quindi espandere Analytics per individuare Data Factory: Products disponibile in base all'area. Gli archivi dati (archiviazione, database SQL, Istanza gestita di SQL di Azure e così via) e i calcoli (Azure HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.

Creare servizi collegati

Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa sezione vengono creati i servizi collegati all'account di archiviazione e al database SQL.

Creare un servizio collegato di archiviazione

  1. Creare un file JSON denominato AzureStorageLinkedService.json nella cartella C:\ADF con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire <accountName> e <accountKey> con il nome e la chiave dell'account di archiviazione prima di salvare il file.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. In PowerShell passare alla cartella ADF.

  3. Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureStorageLinkedService. Nell'esempio seguente si passano i valori per i parametri ResourceGroupName e DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Di seguito è riportato l'output di esempio:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Creare un servizio collegato per il database SQL

  1. Creare un file JSON denominato AzureSQLDatabaseLinkedService.json nella cartella C:\ADF con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire <your-server-name> e <your-database-name> con il nome del server e del database prima di salvare il file. È anche necessario configurare l'SQL Server di Azure per concedere l'accesso all'identità gestita della data factory.

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. In PowerShell passare alla cartella ADF.

  3. Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Di seguito è riportato l'output di esempio:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Creare i set di dati

In questo passaggio vengono creati set di dati per rappresentare i dati di source e sink.

Creare set di dati di origine

  1. Creare un file JSON denominato SourceDataset.json nella stessa cartella con il contenuto seguente:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    In questa esercitazione si utilizza il nome della tabella data_source_table. Sostituiscilo se usi una tabella con un nome diverso.

  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Creare un set di dati sink

  1. Creare un file JSON denominato SinkDataset.json nella stessa cartella con il contenuto seguente:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Importante

    Questo frammento di codice presuppone che nell'archivio BLOB sia presente un contenitore BLOB denominato adftutorial. Creare il contenitore se non esiste oppure impostare il nome di un contenitore esistente. La cartella di output incrementalcopy viene creata automaticamente se non esiste nel contenitore. In questa esercitazione, il nome del file viene generato in modo dinamico con l'espressione @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Creare un set di dati per una filigrana

In questo passaggio si crea un set di dati per l'archiviazione di un valore limite massimo.

  1. Nella stessa cartella creare un file JSON denominato WatermarkDataset.json con il contenuto seguente:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Ecco l'output di esempio del cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Creare una pipeline

In questa esercitazione, crei una pipeline con due attività di ricerca, un'attività di copia e un'attività StoredProcedure concatenate in una pipeline.

  1. Creare un file JSON denominato IncrementalCopyPipeline.json nella stessa cartella con il contenuto seguente:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Eseguire il cmdlet Set-AzDataFactoryV2Pipeline per creare la pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Di seguito è riportato l'output di esempio:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

Eseguire la pipeline

  1. Eseguire la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con il nome del gruppo di risorse e di Data Factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Verificare lo stato della pipeline eseguendo il cmdlet Get-AzDataFactoryV2ActivityRun fino a visualizzare tutte le attività correttamente in esecuzione. Sostituire i segnaposto con l'orario appropriato per i parametri RunStartedAfter e RunStartedBefore. In questa esercitazione si usa -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Di seguito è riportato l'output di esempio:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

Esaminare i risultati

  1. Nell'archivio BLOB (archivio sink), si noterà che i dati sono stati copiati nel file definito in SinkDataset. Nell'esercitazione corrente il nome del file è Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Aprendo il file si può notare che i record corrispondono ai dati presenti nel database SQL.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Verificare il valore più recente da watermarktable. Si noterà che il valore limite è stato aggiornato.

    Select * from watermarktable
    

    Di seguito è riportato l'output di esempio:

    TableName WatermarkValue
    data_source_table 2017-09-05 8:06:00.000

Inserire i dati nell'archivio dei dati sorgente per verificare il caricamento dei dati delta

  1. Inserire nuovi dati nel database SQL (archivio dell'origine dati).

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    I dati aggiornati nel database SQL sono i seguenti:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Eseguire di nuovo la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con quelli del proprio gruppo di risorse e del proprio data factory.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Verificare lo stato della pipeline eseguendo il cmdlet Get-AzDataFactoryV2ActivityRun fino a visualizzare tutte le attività correttamente in esecuzione. Sostituire i segnaposto con l'orario appropriato per i parametri RunStartedAfter e RunStartedBefore. In questa esercitazione si usa -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Di seguito è riportato l'output di esempio:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. Nell'archivio BLOB è stato creato un altro file. In questa esercitazione, il nome del nuovo file è Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Apri quel file e vedi due righe di dati.

  5. Verificare il valore più recente da watermarktable. Si noterà che il valore limite è stato aggiornato di nuovo.

    Select * from watermarktable
    

    Output di esempio:

    NomeTabella WatermarkValue
    tabella_sorgente_dati 2017-09-07 09:01:00.000

In questa esercitazione sono stati eseguiti i passaggi seguenti:

  • Preparare l'archivio dati per l'archiviazione del valore di watermark.
  • Creare una fabbrica di dati.
  • Creare servizi collegati.
  • Creare i set di dati di origine, sink e limite.
  • Creare una pipeline.
  • Esegui la pipeline.
  • Monitorare l'esecuzione della pipeline.

In questa esercitazione la pipeline ha copiato i dati da una singola tabella in database SQL di Azure all'archivio BLOB. Avanza al tutorial successivo per imparare come copiare dati da più tabelle in un database SQL Server a SQL Database.