通过


你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 Azure 数据工厂 中使用 Spark 活动转换云中的数据

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

Microsoft Fabric 中的 Data Factory 是下一代 Azure 数据工厂,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。

在本教程中,你将使用Azure PowerShell创建一个数据工厂管道,该管道使用 Spark 活动和按需 HDInsight 链接服务转换数据。 在本教程中执行以下步骤:

  • 创建数据工厂。
  • 创建并部署关联服务。
  • 创建并部署管道。
  • 启动管道运行。
  • 监视管道运行。

如果没有Azure订阅,请在开始前创建 free 帐户。

先决条件

注意

建议使用 Azure Az PowerShell 模块与Azure交互。 若要开始,请参阅 Install Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 Migrate Azure PowerShell从 AzureRM 迁移到 Az

  • Azure 存储 帐户。 创建Python脚本和输入文件,并将其上传到Azure存储。 Spark 程序的输出存储在此存储帐户中。 按需 Spark 群集使用相同的存储帐户作为其主存储。
  • Azure PowerShell。 按照 如何安装和配置 Azure PowerShell 中的说明。

将Python脚本上传到Blob 存储帐户

  1. 创建包含以下内容的名为 WordCount_Spark.py Python 文件:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替换为Azure 存储帐户的名称。 然后保存文件。

  3. 在Azure Blob 存储中创建名为 adftutorial 的容器(如果不存在)。

  4. 创建名为 spark 的文件夹。

  5. spark 文件夹中创建名为 script 的子文件夹。

  6. WordCount_Spark.py 文件上传到 script 子文件夹。

上传输入文件

  1. 创建包含一些文本的名为 minecraftstory.txt 的文件。 Spark 程序会统计此文本中的单词数量。
  2. inputfiles 文件夹中创建名为 spark 的子文件夹。
  3. minecraftstory.txt 上传到 inputfiles 子文件夹。

作者关联服务

在本部分中创建两个链接服务:

  • Azure 存储链接服务,用于将Azure 存储帐户链接到数据工厂。 按需即用的 HDInsight 群集使用此存储。 此存储位置还包含要执行的 Spark 脚本。
  • 一个按需的 HDInsight 关联服务。 Azure 数据工厂自动创建 HDInsight 群集,运行 Spark 程序,然后在 HDInsight 群集空闲达到预先配置的时间后将其删除。

Azure 存储链接服务

使用首选编辑器创建 JSON 文件,复制Azure 存储链接服务的以下 JSON 定义,然后将该文件保存为 MyStorageLinkedService.json

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

使用 Azure 存储帐户的名称和密钥更新 <storageAccountName> 和 <storageAccountKey>。

按需 HDInsight 联接服务

使用首选编辑器创建 JSON 文件,复制Azure HDInsight链接服务的以下 JSON 定义,并将该文件另存为 MyOnDemandSparkLinkedService.json

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

更新链接服务定义中以下属性的值:

  • hostSubscriptionId。 将 <subscriptionID> 替换为Azure订阅的 ID。 此 HDInsight 群集在该 Azure 订阅中按需创建。
  • 租户。 将 <tenantID> 替换为你的 Azure 租户的 ID。
  • servicePrincipalIdservicePrincipalKey。 将 <servicePrincipalID> 和 <servicePrincipalKey> 替换为Microsoft Entra ID中服务主体的 ID 和密钥。 此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。 有关详细信息,请参阅 创建Microsoft Entra应用程序和服务主体。 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。
  • clusterResourceGroup。 将 <resourceGroupOfHDICluster> 替换为需要在其中创建资源组的 HDInsight 群集的名称。

注意

Azure HDInsight对可在它支持的每个Azure区域中使用的核心总数有限制。 对于按需 HDInsight 链接服务,HDInsight 群集将在用作主存储的Azure 存储所在的同一位置创建。 请确保有足够的核心配额,以便能够成功创建群集。 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集

创作管道

在此步骤中,您将创建一个包含 Spark 活动的新管道。 该活动使用单词计数示例。 从此位置下载内容(如果尚未这样做)。

在偏好的编辑器中创建一个 JSON 文件,复制管道定义的以下 JSON 定义,并将该文件另存为 MySparkOnDemandPipeline.json

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

请注意以下几点:

  • rootPath 指向 adftutorial 容器的 spark 文件夹。
  • entryFilePath 指向 spark 文件夹的 script 子文件夹中的 WordCount_Spark.py 文件。

创建数据工厂

已在 JSON 文件中创作链接服务和管道定义。 现在,让我们创建一个数据工厂,并使用 PowerShell cmdlet 部署链接服务和管道 JSON 文件。 逐条运行以下 PowerShell 命令:

  1. 逐个设置变量。

    资源组名称

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    数据工厂名称。 必须全局唯一

    $dataFactoryName = "MyDataFactory09102017"
    

    管道名称

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. 启动 PowerShell。 使Azure PowerShell保持打开状态,直到本快速入门结束时。 如果将它关闭再重新打开,则需要再次运行下述命令。 有关当前可用的 Azure 区域的列表,请在下面的页面上选择您感兴趣的区域,然后展开 Analytics 以查找 Data Factory按区域提供的产品。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库等)和计算(HDInsight 等)可以位于其他区域。

    运行以下命令,并输入用于登录到Azure门户的用户名和密码:

    Connect-AzAccount
    

    运行以下命令查看此帐户的所有订阅:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。 将 SubscriptionId 替换为Azure订阅的 ID:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. 创建资源组:ADFTutorialResourceGroup。

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. 创建数据工厂。

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    执行以下命令查看输出:

    $df
    
  5. 切换到创建 JSON 文件的文件夹,并运行以下命令以部署Azure 存储链接服务:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 运行以下命令部署按需 Spark 链接服务:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 运行以下命令部署管道:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

启动并监视管道运行

  1. 启动管道运行。 该命令还会捕获管道运行 ID 用于将来的监视。

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 运行以下脚本来持续检查管道运行状态,直到运行完成为止。

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. 下面是示例运行的输出:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. 确认是否在 adftutorial 容器的 outputfiles 文件夹中创建了包含 spark 程序的输出的、名为 spark 的文件夹。

此示例中的管道将数据从一个位置复制到Azure blob 存储中的另一个位置。 你已了解如何执行以下操作:

  • 创建数据工厂。
  • 创建并部署关联服务。
  • 创建并部署管道。
  • 启动流水线运行。
  • 监视管道运行。

转到下一教程,了解如何通过在虚拟网络中的Azure HDInsight群集上运行 Hive 脚本来转换数据。

Tutorial:在 Azure 虚拟网络 中使用 Hive 转换数据。