你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于:
Azure 数据工厂
Azure Synapse Analytics
提示
Microsoft Fabric 中的 Data Factory 是下一代 Azure 数据工厂,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。
本教程演示将多个表从 Azure SQL 数据库 复制到 Azure Synapse Analytics。 在其他复制方案中,也可以应用相同的模式。 例如,将表从 SQL Server/Oracle 复制到 Azure SQL 数据库/Data Warehouse/Azure Blob,将不同路径从 Blob 复制到Azure SQL 数据库表。
从较高层面讲,本教程涉及以下步骤:
- 创建数据工厂。
- 创建Azure SQL 数据库、Azure Synapse Analytics和Azure 存储链接服务。
- 创建Azure SQL 数据库和Azure Synapse Analytics数据集。
- 创建一个管道用于查找要复制的表,创建另一个管道用于执行实际复制操作。
- 启动管道运行。
- 监视管道和活动的运行情况。
本教程使用Azure PowerShell。 若要了解如何使用其他工具/SDK 创建数据工厂,请参阅快速入门。
端到端工作流
在此方案中,我们在Azure SQL 数据库中有一些表,我们希望复制到Azure Synapse Analytics。 下面是管道中发生的工作流中的逻辑步骤顺序:
- 第一个管道查找需要复制到接收器数据存储的表列表。 也可以维护一个元数据表用于列出要复制到接收器数据存储的所有表。 然后,该管道触发另一个管道,后者循环访问数据库中的每个表并执行数据复制操作。
- 第二个管道执行实际复制。 它使用表列表作为参数。 对于列表中的每个表,通过使用 分阶段复制(通过 Blob 存储和 PolyBase 将 Azure SQL 数据库 中的特定表复制到 Azure Synapse Analytics 中的相应表,以获得最佳性能。 在本示例中,第一个管道将表列表作为参数的值传递。
如果没有Azure订阅,请在开始前创建 free 帐户。
先决条件
注意
建议使用 Azure Az PowerShell 模块与Azure交互。 若要开始,请参阅 Install Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 Migrate Azure PowerShell从 AzureRM 迁移到 Az。
- Azure PowerShell。 按照 如何安装和配置 Azure PowerShell 中的说明。
- Azure 存储 帐户。 Azure 存储 帐户在批量复制操作中用作暂存 Blob 存储。
- Azure SQL 数据库。 此数据库包含源数据。
- Azure Synapse Analytics。 此数据仓库包含从 SQL 数据库复制的数据。
准备 SQL 数据库 和 "Azure Synapse Analytics"
准备源Azure SQL 数据库:
请参阅 在 Azure SQL 数据库服务中创建数据库 一文,在 SQL 数据库中创建包含 Adventure Works LT 示例数据的数据库。 本教程将所有表从此示例数据库复制到Azure Synapse Analytics。
准备接收器 Azure Synapse Analytics:
如果没有Azure Synapse Analytics工作区,请参阅 Get started with Azure Synapse Analytics 一文,了解创建一个工作区的步骤。
在Azure Synapse Analytics中创建相应的表架构。 在后续步骤中使用Azure 数据工厂迁移/复制数据。
Azure 服务访问 SQL 服务器
对于 SQL 数据库和 Azure Synapse Analytics,允许 Azure 服务访问 SQL 服务器。 确保服务器的“允许访问 Azure 服务”设置已切换为“打开”状态。 此设置允许数据工厂服务从Azure SQL 数据库读取数据并将数据写入Azure Synapse Analytics。 若要验证并启用此设置,请执行以下步骤:
- 单击左侧的“所有服务”,然后单击“SQL Server”。
- 选择服务器,并单击“设置”下的“防火墙”。
- 在 Firewall 设置页中,单击 ON,以允许访问 Azure 服务。
创建数据工厂
启动 PowerShell。 使Azure PowerShell保持打开状态,直到本教程结束。 如果将它关闭再重新打开,则需要再次运行下述命令。
运行以下命令,并输入用于登录到Azure门户的用户名和密码:
Connect-AzAccount运行以下命令查看此帐户的所有订阅:
Get-AzSubscription运行以下命令选择要使用的订阅。 将 SubscriptionId 替换为Azure订阅的 ID:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"运行 Set-AzDataFactoryV2 cmdlet 创建数据工厂。 执行命令之前,请先将占位符替换为您自己的值。
$resourceGroupName = "<your resource group to create the factory>" $dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>" Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName请注意以下几点:
Azure数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试。
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.若要创建数据工厂实例,必须是Azure订阅的参与者或管理员。
有关当前可用的 Azure 区域的列表,请在下面的页面上选择您感兴趣的区域,然后展开 Analytics 以查找 Data Factory:按区域提供的产品。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库等)和计算(HDInsight 等)可以位于其他区域。
创建链接服务
本教程分别为源、接收器和过渡 Blob 创建了三个链接服务,其中包括数据存储的连接:
创建 Azure SQL 数据库 源连接服务
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDatabaseLinkedService.json 的 JSON 文件:(如果 ADFv2TutorialBulkCopy 文件夹尚不存在,则创建该文件夹。)
重要
在保存文件之前,将 <servername>,<databasename>,<username>@<servername> 和 <password> 替换为你的 Azure SQL 数据库的值。
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30" } } }在 Azure PowerShell 中,切换到 ADFv2TutorialBulkCopy 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureSqlDatabaseLinkedService。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDatabaseLinkedService" -File ".\AzureSqlDatabaseLinkedService.json"下面是示例输出:
LinkedServiceName : AzureSqlDatabaseLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
创建接收器 Azure Synapse Analytics 链接服务
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDWLinkedService.json 的 JSON 文件:
重要
在保存文件之前,将 <servername>、<databasename>、<username>@<servername> 和 <password> 替换为您的 Azure SQL 数据库 的相应值。
{ "name": "AzureSqlDWLinkedService", "properties": { "type": "AzureSqlDW", "typeProperties": { "connectionString": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30" } } }若要创建链接服务 AzureSqlDWLinkedService,请运行 Set-AzDataFactoryV2LinkedService cmdlet。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDWLinkedService" -File ".\AzureSqlDWLinkedService.json"下面是示例输出:
LinkedServiceName : AzureSqlDWLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDWLinkedService
创建过渡 Azure 存储链接服务
在本教程中,将使用 Azure Blob 存储作为临时暂存区域来启用 PolyBase 以提高复制性能。
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件:
重要
在保存文件之前,请将 <accountName> 和 <accountKey> 替换为Azure存储帐户的名称和密钥。
{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }若要创建链接服务 AzureStorageLinkedService,请运行 Set-AzDataFactoryV2LinkedService cmdlet。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"下面是示例输出:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
创建数据集
在本教程中创建源和接收器数据集,用于指定数据的存储位置:
为源 SQL 数据库创建数据集
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDatabaseDataset.json 的 JSON 文件。 “tableName”是一个虚构名称,因为稍后要在复制活动中使用 SQL 查询检索数据。
{ "name": "AzureSqlDatabaseDataset", "properties": { "type": "AzureSqlTable", "linkedServiceName": { "referenceName": "AzureSqlDatabaseLinkedService", "type": "LinkedServiceReference" }, "typeProperties": { "tableName": "dummy" } } }若要创建数据集 AzureSqlDatabaseDataset,请运行 Set-AzDataFactoryV2Dataset cmdlet。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDatabaseDataset" -File ".\AzureSqlDatabaseDataset.json"下面是示例输出:
DatasetName : AzureSqlDatabaseDataset ResourceGroupName : <resourceGroupname> DataFactoryName : <dataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
为Azure Synapse Analytics汇聚点创建数据集
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDWDataset.json 的 JSON 文件:将“tableName”设置为参数,稍后引用此数据集的复制活动会将实际值传递给数据集。
{ "name": "AzureSqlDWDataset", "properties": { "type": "AzureSqlDWTable", "linkedServiceName": { "referenceName": "AzureSqlDWLinkedService", "type": "LinkedServiceReference" }, "typeProperties": { "tableName": { "value": "@{dataset().DWTableName}", "type": "Expression" } }, "parameters":{ "DWTableName":{ "type":"String" } } } }若要创建数据集 AzureSqlDWDataset,请运行 Set-AzDataFactoryV2Dataset cmdlet。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDWDataset" -File ".\AzureSqlDWDataset.json"下面是示例输出:
DatasetName : AzureSqlDWDataset ResourceGroupName : <resourceGroupname> DataFactoryName : <dataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDwTableDataset
创建管道
在本教程中创建两个管道:
创建管道“IterateAndCopySQLTables”
此管道使用表列表作为参数。 对于列表中的每个表,它使用暂存复制和 PolyBase 将数据从Azure SQL 数据库中的表复制到Azure Synapse Analytics。
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 IterateAndCopySQLTables.json 的 JSON 文件:
{ "name": "IterateAndCopySQLTables", "properties": { "activities": [ { "name": "IterateSQLTables", "type": "ForEach", "typeProperties": { "isSequential": "false", "items": { "value": "@pipeline().parameters.tableList", "type": "Expression" }, "activities": [ { "name": "CopyData", "description": "Copy data from Azure SQL Database to Azure Synapse Analytics", "type": "Copy", "inputs": [ { "referenceName": "AzureSqlDatabaseDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "AzureSqlDWDataset", "type": "DatasetReference", "parameters": { "DWTableName": "[@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]" } } ], "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "SELECT * FROM [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]" }, "sink": { "type": "SqlDWSink", "preCopyScript": "TRUNCATE TABLE [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]", "allowPolyBase": true }, "enableStaging": true, "stagingSettings": { "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } } } ] } } ], "parameters": { "tableList": { "type": "Object" } } } }若要创建管道 IterateAndCopySQLTables,请运行 Set-AzDataFactoryV2Pipeline cmdlet。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IterateAndCopySQLTables" -File ".\IterateAndCopySQLTables.json"下面是示例输出:
PipelineName : IterateAndCopySQLTables ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
创建管道“GetTableListAndTriggerCopyData”
此管道执行两个步骤:
- 查找Azure SQL 数据库系统表以获取要复制的表列表。
- 触发管道“IterateAndCopySQLTables”以完成实际的数据复制任务。
在 C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 GetTableListAndTriggerCopyData.json 的 JSON 文件:
{ "name":"GetTableListAndTriggerCopyData", "properties":{ "activities":[ { "name": "LookupTableList", "description": "Retrieve the table list from Azure SQL database", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'SalesLT' and TABLE_NAME <> 'ProductModel'" }, "dataset": { "referenceName": "AzureSqlDatabaseDataset", "type": "DatasetReference" }, "firstRowOnly": false } }, { "name": "TriggerCopy", "type": "ExecutePipeline", "typeProperties": { "parameters": { "tableList": { "value": "@activity('LookupTableList').output.value", "type": "Expression" } }, "pipeline": { "referenceName": "IterateAndCopySQLTables", "type": "PipelineReference" }, "waitOnCompletion": true }, "dependsOn": [ { "activity": "LookupTableList", "dependencyConditions": [ "Succeeded" ] } ] } ] } }若要创建管道 GetTableListAndTriggerCopyData,请运行 Set-AzDataFactoryV2Pipeline cmdlet。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "GetTableListAndTriggerCopyData" -File ".\GetTableListAndTriggerCopyData.json"下面是示例输出:
PipelineName : GetTableListAndTriggerCopyData ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Activities : {LookupTableList, TriggerCopy} Parameters :
启动并监视管道运行
针对“GetTableListAndTriggerCopyData”主管道启动管道运行,并捕获管道运行 ID,以便将来进行监视。 随后,此管道根据 ExecutePipeline 活动中的指定,触发管道“IterateAndCopySQLTables”的运行。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName 'GetTableListAndTriggerCopyData'运行以下脚本以持续监测管道 GetTableListAndTriggerCopyData 的运行状态,并输出最终的管道运行结果和活动运行结果。
while ($True) { $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId if ($run) { if ($run.Status -ne 'InProgress') { Write-Host "Pipeline run finished. The status is: " $run.Status -ForegroundColor "Yellow" Write-Host "Pipeline run details:" -ForegroundColor "Yellow" $run break } Write-Host "Pipeline is running...status: InProgress" -ForegroundColor "Yellow" } Start-Sleep -Seconds 15 } $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) Write-Host "Activity run details:" -ForegroundColor "Yellow" $result下面是示例运行的输出:
Pipeline run details: ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> RunId : 0000000000-00000-0000-0000-000000000000 PipelineName : GetTableListAndTriggerCopyData LastUpdated : 9/18/2017 4:08:15 PM Parameters : {} RunStart : 9/18/2017 4:06:44 PM RunEnd : 9/18/2017 4:08:15 PM DurationInMs : 90637 Status : Succeeded Message : Activity run details: ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> ActivityName : LookupTableList PipelineRunId : 0000000000-00000-0000-0000-000000000000 PipelineName : GetTableListAndTriggerCopyData Input : {source, dataset, firstRowOnly} Output : {count, value, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/18/2017 4:06:46 PM ActivityRunEnd : 9/18/2017 4:07:09 PM DurationInMs : 22995 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> ActivityName : TriggerCopy PipelineRunId : 0000000000-00000-0000-0000-000000000000 PipelineName : GetTableListAndTriggerCopyData Input : {pipeline, parameters, waitOnCompletion} Output : {pipelineRunId} LinkedServiceName : ActivityRunStart : 9/18/2017 4:07:11 PM ActivityRunEnd : 9/18/2017 4:08:14 PM DurationInMs : 62581 Status : Succeeded Error : {errorCode, message, failureType, target}可以获取管道“IterateAndCopySQLTables”的运行 ID,并检查详细的活动运行结果,如下所示。
Write-Host "Pipeline 'IterateAndCopySQLTables' run result:" -ForegroundColor "Yellow" ($result | Where-Object {$_.ActivityName -eq "TriggerCopy"}).Output.ToString()下面是示例运行的输出:
{ "pipelineRunId": "7514d165-14bf-41fb-b5fb-789bea6c9e58" }$result2 = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId <copy above run ID> -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) $result2连接到接收器 Azure Synapse Analytics,并确认是否已正确地从 Azure SQL 数据库复制数据。
相关内容
已在本教程中执行了以下步骤:
- 创建数据工厂。
- 创建Azure SQL 数据库、Azure Synapse Analytics和Azure 存储链接服务。
- 创建Azure SQL 数据库和Azure Synapse Analytics数据集。
- 创建一个管道用于查找要复制的表,创建另一个管道用于执行实际复制操作。
- 启动管道运行。
- 监视管道和活动的运行情况。
转到以下教程,了解如何以增量方式将数据从源复制到目标: