你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

复制数据并发送有关成功和失败的电子邮件通知

适用于: Azure Data Factory Azure Synapse Analytics

提示

Microsoft Fabric 中的 Data Factory 是下一代 Azure Data Factory,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。

在本教程中,我们将创建一个数据工厂管道来展示某些控制流功能。 此管道执行从Azure Blob Storage容器到同一存储帐户中的另一个容器的简单复制。 如果复制活动成功,该管道会在告知成功结果的电子邮件中发送成功复制操作的详细信息(例如写入的数据量)。 如果复制活动失败,该管道会在告知失败结果的电子邮件中发送复制失败的详细信息(例如错误消息)。 整个教程讲解了如何传递参数。

方案概述:图表显示了 Azure Blob Storage,它是复制操作的目标。在成功时,会发送包含详细信息的电子邮件;而在失败时,会发送包含错误详细信息的电子邮件。

在本教程中执行以下步骤:

  • 创建数据工厂。
  • 创建Azure Storage链接服务。
  • 创建Azure Blob 数据集
  • 创建包含“复制”活动和“Web”活动的管道
  • 将活动输出发送至后续活动
  • 利用参数传递和系统变量
  • 启动管道运行
  • 监视管道和活动运行

本教程使用Azure门户。 可以使用其他机制来与Azure Data Factory交互,请参阅目录中的“快速入门”。

先决条件

  • Azure 订阅。 如果没有Azure订阅,请在开始前创建 free 帐户。
  • Azure Storage 帐户。 可将 Blob 存储用作数据存储。 如果没有Azure存储帐户,请参阅 创建存储帐户一文,了解创建存储帐户的步骤。
  • Azure SQL Database。 将数据库用作接收器数据存储。 如果没有Azure SQL Database中的数据库,请参阅 在 Azure SQL Database一文,了解创建一个数据库的步骤。

创建 Blob 表

  1. 启动记事本。 复制以下文本并在磁盘上将其另存为 input.txt 文件。

    John,Doe
    Jane,Doe
    
  2. 使用 Azure Storage Explorer等工具执行以下步骤:

    1. 创建 adfv2branch 容器。
    2. adfv2branch 容器中创建 input 文件夹。
    3. input.txt 文件上传到该容器。

创建电子邮件工作流终结点

若要触发从管道发送电子邮件,请使用 Azure Logic Apps 定义工作流。 有关创建逻辑应用工作流的详细信息,请参阅创建示例消耗逻辑应用工作流

成功电子邮件工作流

创建名为“CopySuccessEmail”消耗逻辑应用工作流。 添加名为收到 HTTP 请求时的请求触发器,并添加名为发送电子邮件Office 365 Outlook操作。 如果系统提示,请登录到Office 365 Outlook帐户。

显示成功电子邮件工作流的屏幕截图。

对于请求触发器,请在请求正文 JSON 架构框中填写以下 JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

工作流设计器中的“请求”触发器应如下图所示:

显示包含“请求”触发器的工作流设计器的屏幕截图。

对于“发送电子邮件”操作,请利用请求正文 JSON 架构中传入的属性,自定义您希望采用的电子邮件格式。 以下是示例:

显示包含名为“发送电子邮件”操作的工作流设计器的屏幕截图。

保存工作流。 记下成功电子邮件工作流的 HTTP Post 请求 URL:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

失败电子邮件工作流

遵循相同的步骤创建另一个名为“CopyFailEmail”的逻辑应用工作流。 在“请求”触发器中,请求正文 JSON 架构值是相同的。 将电子邮件的格式更改为类似于Subject的格式,以便针对失败的邮件进行定制。 以下是示例:

显示包含失败电子邮件工作流的工作流设计器的屏幕截图。

保存工作流。 记下失败电子邮件工作流的 HTTP Post 请求 URL:

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

现在,您应当得到两个工作流链接 (URL):

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

创建数据工厂

  1. 启动 Microsoft Edge Google Chrome Web 浏览器。 目前,数据工厂 UI 仅在 Microsoft Edge 和 Google Chrome Web 浏览器中受支持。

  2. 展开左上角的菜单,选择“创建资源”。 然后选择 “分析>数据工厂 ” :

    显示“新建”窗格中的数据工厂选择的屏幕截图。

  3. 在“新建数据工厂”页中,输入 ADFTutorialDataFactory 作为名称

    新数据工厂页面

    Azure数据工厂的名称必须全球唯一。 如果收到错误,请更改数据工厂的名称(例如改为 yournameADFTutorialDataFactory),并重新尝试创建。 有关数据工厂项目命名规则,请参阅数据工厂 - 命名规则一文。

    数据工厂名“ADFTutorialDataFactory”不可用

  4. 选择要在其中创建数据工厂的 Azure 订阅

  5. 对于资源组,请执行以下步骤之一:

    • 选择“使用现有资源组”,并从下拉列表选择现有的资源组。

    • 选择“新建”,并输入资源组的名称。

      若要了解资源组,请参阅 使用资源组来管理Azure资源

  6. 选择V2作为版本

  7. 选择数据工厂的位置。 下拉列表中仅显示支持的位置。 数据工厂使用的数据存储(Azure Storage、Azure SQL Database等)和计算(HDInsight 等)可以位于其他区域。

  8. 选择“固定到仪表板”。

  9. 单击创建

  10. 创建完成后,可以看到图中所示的“数据工厂”页。

    显示数据工厂主页的屏幕截图。

  11. 单击 Open Azure Data Factory Studio 磁贴,在单独的选项卡中启动Azure Data Factory用户界面(UI)。

创建管道

在此步骤中,你将创建一个管道,其中包含一个复制活动和两个Web活动。 使用以下功能创建管道:

  • 数据集访问的管道参数。
  • 用于调用逻辑应用工作流发送成功/失败电子邮件的 Web 活动。
  • 将一个活动与另一个活动相连接(无论成败)
  • 使用一个活动的输出作为后续活动的输入
  1. 在数据工厂 UI 的主页中,单击“协调”磁贴。

    显示数据工厂主页的屏幕截图,其中突出显示了“协调”磁贴。

  2. 在管道的属性窗口中切换到“参数”选项卡,并使用“新建”按钮添加字符串类型的以下三个参数:sourceBlobContainer、sinkBlobContainer 和 receiver。

    • sourceBlobContainer – 源 Blob 数据集使用的管道中的参数。
    • sinkBlobContainer – 接收器 Blob 数据集使用的管道中的参数
    • receiver - 此参数由管道中的两个 Web 活动用来向其电子邮件地址已通过此参数指定的接收方发送成功或失败电子邮件。

    显示“新建管道”菜单的屏幕截图。

  3. 在“活动”工具箱中搜索“复制”,将“复制”活动拖放到管道设计器图面。

    显示的屏幕截图演示如何将复制活动拖放到管道设计器。

  4. 选择拖到管道设计器图面上的“复制”活动。 在底部“复制”活动的“属性”窗口中切换到“源”选项卡,然后单击“+ 新建”。 您在此步骤中创建复制活动的源数据集。

    屏幕截图,显示如何创建复制活动的源数据集。

  5. 新建数据集窗口中,选择顶部的Azure选项卡,然后选择Azure Blob Storage,然后选择Continue

    显示选择Azure Blob存储按钮的屏幕截图。

  6. 在“选择格式”窗口中,选择“DelimitedText”,然后选择“继续”。

    屏幕截图显示“选择格式”窗口,其中突出显示了 DelimitedText 格式。

  7. 你会看到一个新的选项卡,标题为“设置属性”。 将数据集的名称更改为 SourceBlobDataset。 选择“链接服务”下拉列表,然后选择“+新建”以创建源数据集的新链接服务。

    屏幕截图显示数据集“设置属性”窗口,其中突出显示了“链接服务”下拉列表下的“+新建”按钮。**

  8. 此时会出现“新建链接服务”窗口,你可以在其中填写链接服务所需的属性。

    显示数据集连接窗口的屏幕截图,其中突出显示了“新建链接服务”按钮。

  9. 在“新建链接服务” 窗口中完成以下步骤:

    1. 输入 AzureStorageLinkedService 作为名称
    2. 为 Azure 存储帐户选择 Storage account name
    3. 单击创建
  10. 在接下来显示的“设置属性”窗口中,选择“打开此数据集”以输入文件名的参数化值。

    屏幕截图显示数据集“设置属性”窗口,其中突出显示了“打开此数据集”链接。

  11. 输入 @pipeline().parameters.sourceBlobContainer 作为文件夹,输入 emp.txt 作为文件名。

    显示源数据集设置的屏幕截图。

  12. 切换回“管道”选项卡(或单击左侧树视图中的管道),然后选择设计器上的“复制”活动。 确认为“源数据集”选择了新数据集。

    显示源数据集的屏幕截图。

  13. 在属性窗口中切换到“接收器”选项卡,针对“接收器数据集”单击“+ 新建”。 像创建源数据集一样,在此步骤中创建复制活动的接收器数据集。

    显示“新建接收器数据集”按钮的屏幕截图

  14. 在“新建数据集窗口中,选择Azure Blob Storage,然后单击Continue, 然后在 Select 格式窗口中再次选择 DelimitedText,然后再次单击 Continue

  15. 在数据集的“设置属性”页中,输入“SinkBlobDataset”作为“名称”,然后为 LinkedService 选择“AzureStorageLinkedService”

  16. 展开属性页的“高级”部分,然后选择“打开此数据集”。

  17. 在数据集的“连接”选项卡上,编辑“文件路径”。 输入 @pipeline().parameters.sinkBlobContainer 作为文件夹,输入 @concat(pipeline().RunId, '.txt') 作为文件名。 该表达式使用当前管道运行的 ID 作为文件名。 有关支持的系统变量和表达式列表,请参阅系统变量表达式语言

    显示接收器数据集设置的屏幕截图。

  18. 切换回顶部的“管道”选项卡。 在搜索框中搜索“Web”,并将“Web”活动拖放到管道设计器图面。 将活动的名称设置为 SendSuccessEmailActivity。 Web 活动允许调用任何 REST 终结点。 有关该活动的详细信息,请参阅 Web 活动。 此管道使用 Web 活动调用逻辑应用电子邮件工作流。

    显示的屏幕截图演示如何拖放第一个 Web 活动。

  19. 从“常规”选项卡切换到“设置”选项卡,然后执行以下步骤:

    1. 对于URL,请指定用于在逻辑应用工作流中发送成功电子邮件的URL。

    2. 请为方法选择POST

    3. 在“标头”部分中单击“+ 添加标头”链接。

    4. 添加标头 Content-Type 并将其设置为 application/json

    5. Body指定以下 JSON。

      {
          "message": "@{activity('Copy1').output.dataWritten}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      消息正文包含以下属性:

      • 消息 – 传递 @{activity('Copy1').output.dataWritten 的值。 访问前一复制活动的属性,并传递 dataWritten 的值。 失败时,传递错误输出而不是 @{activity('CopyBlobtoBlob').error.message

      • 数据工厂名称 - @{pipeline().DataFactory} 的传递值。这是一个系统变量,用于访问相应的数据工厂名称。 有关系统变量的列表,请参阅系统变量一文。

      • 管道名称 – 传递 @{pipeline().Pipeline} 的值。 这也是系统变量,用于访问相应的管道名称。

      • 接收方 – 传递 "@pipeline().parameters.receiver") 的值。 访问管道参数。

        显示第一个 Web 活动的设置页面的屏幕截图。

  20. 将“复制”活动旁边的绿色复选框按钮拖放到“Web”活动,以便将“复制”活动连接到“Web”活动。

     演示了如何将Copy activity与第一个 Web 活动连接起来的屏幕截图。

  21. 将另一个Web活动从活动工具箱拖放到管道设计器表面,并将名称设置为SendFailureEmailActivity

    显示第二个 Web 活动的名称的屏幕截图。

  22. 切换到“设置”选项卡,然后执行以下步骤:

    1. 对于URL,请指定逻辑应用程序的工作流的URL,该工作流用于发送失败通知电子邮件。

    2. 请为方法选择POST

    3. 在“标头”部分中单击“+ 添加标头”链接。

    4. 添加标头 Content-Type 并将其设置为 application/json

    5. Body指定以下 JSON。

      {
          "message": "@{activity('Copy1').error.message}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      显示第二个 Web 活动的设置的屏幕截图。

  23. 选择管道设计器中“复制”活动右侧的红色“X”按钮,然后将其拖放到刚创建的 SendFailureEmailActivity 上。

    这是一张截图,展示如何在管道设计器的复制活动中选择“错误”。

  24. 若要验证管道,请单击工具栏中的“验证”按钮。 单击 按钮关闭“管道验证输出”窗口。

    显示“验证管道”按钮的屏幕截图。

  25. 若要将实体(数据集、管道等)发布到数据工厂服务,请选择“全部发布”。 等待直到您看到“已成功发布”消息。

    显示数据工厂门户中“发布”按钮的屏幕截图。

触发成功的管道运行

  1. 若要触发某个管道运行,请在工具栏中单击“触发器”,然后单击“立即触发”。

    显示“立即触发”按钮的屏幕截图。

  2. 在“管道运行”窗口中执行以下步骤:

    1. 输入adftutorial/adfv2branch/input作为sourceBlobContainer参数。

    2. sinkBlobContainer 参数输入 adftutorial/adfv2branch/output

    3. 输入接收方电子邮件地址

    4. 单击“完成”

      管道运行参数

监视成功的管道运行

  1. 若要监视管道运行,请切换到左侧的“监视”选项卡。 可以看到手动触发的管道运行。 使用“刷新”按钮刷新列表。

    成功的管道运行

  2. 若要查看与此管道运行关联的活动运行,请单击“操作”列中的第一个链接。 您可以通过单击顶部的管道切换回到之前的视角。 使用“刷新”按钮刷新列表。

    屏幕截图展示了如何查看活动运行列表。

触发失败的管道运行

  1. 在左侧切换到“编辑”选项卡。

  2. 若要触发某个管道运行,请在工具栏中单击“触发器”,然后单击“立即触发”。

  3. 在“管道运行”窗口中执行以下步骤:

    1. sourceBlobContainer 参数输入 adftutorial/dummy/input。 请确保 adftutorial 容器中不存在 dummy 文件夹。
    2. sinkBlobContainer 参数输入 adftutorial/dummy/output
    3. 输入接收方电子邮件地址
    4. 单击“完成”。

监视失败的管道运行

  1. 若要监视管道运行,请切换到左侧的“监视”选项卡。 可以看到手动触发的管道运行。 使用“刷新”按钮刷新列表。

    管道运行失败

  2. 单击管道运行对应的“错误”链接可查看有关错误的详细信息。

    管道错误

  3. 若要查看与此管道运行关联的活动运行,请单击“操作”列中的第一个链接。 使用“刷新”按钮刷新列表。 请注意,管道中的“复制”活动失败。 “Web”活动已成功将失败电子邮件发送到指定的接收方。

    活动运行

  4. 在“操作”列中单击“错误”链接可查看有关错误的详细信息。

    活动运行错误

已在本教程中执行了以下步骤:

  • 创建数据工厂。
  • 创建Azure Storage链接服务。
  • 创建Azure Blob 数据集
  • 创建包含复制活动和 Web 活动的管道
  • 将活动输出发送至后续活动
  • 利用参数传递和系统变量
  • 启动管道运行
  • 监视管道和活动运行

现在可以转到“概念”部分,了解有关Azure Data Factory的详细信息。