你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于:
Azure 数据工厂
Azure Synapse Analytics
提示
Microsoft Fabric 中的 Data Factory 是下一代 Azure 数据工厂,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。
在本教程中,你将使用Azure PowerShell创建一个数据工厂管道,该管道使用 Spark 活动和按需 HDInsight 链接服务转换数据。 在本教程中执行以下步骤:
- 创建数据工厂。
- 创建并部署关联服务。
- 创建并部署管道。
- 启动管道运行。
- 监视管道运行。
如果没有Azure订阅,请在开始前创建 free 帐户。
先决条件
注意
建议使用 Azure Az PowerShell 模块与Azure交互。 若要开始,请参阅 Install Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 Migrate Azure PowerShell从 AzureRM 迁移到 Az。
- Azure 存储 帐户。 创建Python脚本和输入文件,并将其上传到Azure存储。 Spark 程序的输出存储在此存储帐户中。 按需 Spark 群集使用相同的存储帐户作为其主存储。
- Azure PowerShell。 按照 如何安装和配置 Azure PowerShell 中的说明。
将Python脚本上传到Blob 存储帐户
创建包含以下内容的名为 WordCount_Spark.py Python 文件:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()将 <storageAccountName> 替换为Azure 存储帐户的名称。 然后保存文件。
在Azure Blob 存储中创建名为 adftutorial 的容器(如果不存在)。
创建名为 spark 的文件夹。
在 spark 文件夹中创建名为 script 的子文件夹。
将 WordCount_Spark.py 文件上传到 script 子文件夹。
上传输入文件
- 创建包含一些文本的名为 minecraftstory.txt 的文件。 Spark 程序会统计此文本中的单词数量。
- 在
inputfiles文件夹中创建名为spark的子文件夹。 - 将
minecraftstory.txt上传到inputfiles子文件夹。
作者关联服务
在本部分中创建两个链接服务:
- Azure 存储链接服务,用于将Azure 存储帐户链接到数据工厂。 按需即用的 HDInsight 群集使用此存储。 此存储位置还包含要执行的 Spark 脚本。
- 一个按需的 HDInsight 关联服务。 Azure 数据工厂自动创建 HDInsight 群集,运行 Spark 程序,然后在 HDInsight 群集空闲达到预先配置的时间后将其删除。
Azure 存储链接服务
使用首选编辑器创建 JSON 文件,复制Azure 存储链接服务的以下 JSON 定义,然后将该文件保存为 MyStorageLinkedService.json。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
使用 Azure 存储帐户的名称和密钥更新 <storageAccountName> 和 <storageAccountKey>。
按需 HDInsight 联接服务
使用首选编辑器创建 JSON 文件,复制Azure HDInsight链接服务的以下 JSON 定义,并将该文件另存为 MyOnDemandSparkLinkedService.json。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
更新链接服务定义中以下属性的值:
- hostSubscriptionId。 将 <subscriptionID> 替换为Azure订阅的 ID。 此 HDInsight 群集在该 Azure 订阅中按需创建。
- 租户。 将 <tenantID> 替换为你的 Azure 租户的 ID。
- servicePrincipalId、servicePrincipalKey。 将 <servicePrincipalID> 和 <servicePrincipalKey> 替换为Microsoft Entra ID中服务主体的 ID 和密钥。 此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。 有关详细信息,请参阅 创建Microsoft Entra应用程序和服务主体。 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。
- clusterResourceGroup。 将 <resourceGroupOfHDICluster> 替换为需要在其中创建资源组的 HDInsight 群集的名称。
注意
Azure HDInsight对可在它支持的每个Azure区域中使用的核心总数有限制。 对于按需 HDInsight 链接服务,HDInsight 群集将在用作主存储的Azure 存储所在的同一位置创建。 请确保有足够的核心配额,以便能够成功创建群集。 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集。
创作管道
在此步骤中,您将创建一个包含 Spark 活动的新管道。 该活动使用单词计数示例。 从此位置下载内容(如果尚未这样做)。
在偏好的编辑器中创建一个 JSON 文件,复制管道定义的以下 JSON 定义,并将该文件另存为 MySparkOnDemandPipeline.json。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
请注意以下几点:
- rootPath 指向 adftutorial 容器的 spark 文件夹。
- entryFilePath 指向 spark 文件夹的 script 子文件夹中的 WordCount_Spark.py 文件。
创建数据工厂
已在 JSON 文件中创作链接服务和管道定义。 现在,让我们创建一个数据工厂,并使用 PowerShell cmdlet 部署链接服务和管道 JSON 文件。 逐条运行以下 PowerShell 命令:
逐个设置变量。
资源组名称
$resourceGroupName = "ADFTutorialResourceGroup"数据工厂名称。 必须全局唯一
$dataFactoryName = "MyDataFactory09102017"管道名称
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline启动 PowerShell。 使Azure PowerShell保持打开状态,直到本快速入门结束时。 如果将它关闭再重新打开,则需要再次运行下述命令。 有关当前可用的 Azure 区域的列表,请在下面的页面上选择您感兴趣的区域,然后展开 Analytics 以查找 Data Factory:按区域提供的产品。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库等)和计算(HDInsight 等)可以位于其他区域。
运行以下命令,并输入用于登录到Azure门户的用户名和密码:
Connect-AzAccount运行以下命令查看此帐户的所有订阅:
Get-AzSubscription运行以下命令选择要使用的订阅。 将 SubscriptionId 替换为Azure订阅的 ID:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"创建资源组:ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"创建数据工厂。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName执行以下命令查看输出:
$df切换到创建 JSON 文件的文件夹,并运行以下命令以部署Azure 存储链接服务:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"运行以下命令部署按需 Spark 链接服务:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"运行以下命令部署管道:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
启动并监视管道运行
启动管道运行。 该命令还会捕获管道运行 ID 用于将来的监视。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName运行以下脚本来持续检查管道运行状态,直到运行完成为止。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"下面是示例运行的输出:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"确认是否在 adftutorial 容器的
outputfiles文件夹中创建了包含 spark 程序的输出的、名为spark的文件夹。
相关内容
此示例中的管道将数据从一个位置复制到Azure blob 存储中的另一个位置。 你已了解如何执行以下操作:
- 创建数据工厂。
- 创建并部署关联服务。
- 创建并部署管道。
- 启动流水线运行。
- 监视管道运行。
转到下一教程,了解如何通过在虚拟网络中的Azure HDInsight群集上运行 Hive 脚本来转换数据。
Tutorial:在 Azure 虚拟网络 中使用 Hive 转换数据。