你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于:
Azure 数据工厂
Azure Synapse Analytics
提示
Microsoft Fabric 中的 Data Factory 是下一代 Azure 数据工厂,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。
使用数据流活动通过映射数据流转换和移动数据。 如果不熟悉数据流,请参阅 Mapping 数据流 概述
使用 UI 创建数据流任务
若要在管道中使用数据流活动,请完成以下步骤:
在“管道活动”窗格中搜索 数据流,并将数据流活动拖动到管道画布。
如尚未选择画布上的新数据流活动,请选择它并打开Settings选项卡以编辑详细信息。
当数据流用于更改的数据捕获时,检查点键用于设置检查点。 可以覆盖它。 数据流活动使用 GUID 值作为检查点键,而不是“管道名称 + 活动名称”,这样即使有重命名操作,也能始终跟踪客户的变更数据捕获状态。 所有现有数据流活动都使用旧模式键实现后向兼容性。 通过已启用更改数据捕获的数据流资源发布新数据流活动后的检查点键选项如下所示。
选择现有数据流,或使用“新建”按钮创建一个新数据流。 选择所需的其他选项以完成配置。
语法
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
Type 属性
| 属性 | 说明 | 允许的值 | 必须 |
|---|---|---|---|
| 数据流 | 对正在执行的数据流的引用 | DataFlowReference | 是 |
| integrationRuntime | 运行数据流的计算环境。 如果未指定,将使用自动解析的 Azure 集成运行时。 | IntegrationRuntimeReference | 否 |
| 计算核心数:compute.coreCount | Spark 群集中使用的内核数。 只有在使用 autoresolve Azure Integration Runtime 时,才能指定。 | 8、16、32、48、80、144、272 | 否 |
| compute.computeType | Spark 群集中使用的计算类型。 仅当使用自动解析 Azure Integration Runtime 时才能指定 | “常规” | 否 |
| staging.linkedService | 如果使用的是 Azure Synapse Analytics 源或接收器,请指定用于 PolyBase 暂存的存储帐户。 如果您的 Azure 存储配置了 VNet 服务终结点,则必须在存储帐户上启用托管身份验证,并且勾选“允许信任的 Microsoft 服务通过”选项。请参阅 使用 VNet 服务终结点对 Azure 存储的影响。 另请分别了解 Azure Blob 和 Azure Data Lake Storage Gen2 所需的配置。 |
LinkedServiceReference | 仅当数据流读取或写入 Azure Synapse Analytics 时 |
| staging.folderPath | 如果使用的是 Azure Synapse Analytics 源或接收器,则为 blob 存储帐户中用于 PolyBase 暂存的文件夹路径 | 字符串 | 仅当数据流读取或写入 Azure Synapse Analytics 时 |
| traceLevel | 设置数据流活动执行过程的日志记录级别 | 精细、粗略、无 | 否 |
在运行时动态调整数据流大小
可以动态设置“核心计数”和“计算类型”属性,以在运行时调整传入源数据的大小。 使用管道活动(例如“查找”或“获取元数据”),以便查找源数据集数据的大小。 然后,在数据流活动属性中使用“添加动态内容”。 可以选择小、中或大计算大小。 (可选)选择“自定义”并手动配置计算类型和核心数。
数据流集成运行时
选择要用于数据流活动执行的Integration Runtime。 在默认情况下,服务会将自动解析 Azure Integration Runtime 与四个辅助角色核心配合使用。 此 IR 具有常规用途计算类型,并在与服务实例相同的区域中运行。 对于操作化管道,强烈建议创建自己的 Azure Integration Runtime,用于定义数据流活动执行的特定区域、计算类型、核心计数和 TTL。
对于大多数生产工作负载,建议至少使用一个具有 8+8(总计 16)个 v-Core 配置和 10 分钟生存时间的“常规用途”计算类型。 通过设置较小的 TTL,Azure IR 可以保持热群集,从而避免冷群集启动时需耗费几分钟的等待时间。 有关详细信息,请参阅 Azure 集成运行时。
重要
数据流活动中的 Integration Runtime 选择仅适用于管道的已触发执行。 调试管道时,数据流将在调试会话中指定的群集上运行。
PolyBase
如果使用 Azure Synapse Analytics 作为接收器或源,则必须为 PolyBase 批量加载选择一个暂存位置。 PolyBase 允许批量加载而不是逐行加载数据。 PolyBase 将加载时间大幅减少到Azure Synapse Analytics。
检查点键
对数据流源使用更改捕获选项时,ADF 会自动维护和管理检查点。 默认检查点键是数据流名称和管道名称的哈希。 如果对源表或文件夹使用动态模式,可能要替代此哈希并在此处设置自己的检查点键值。
日志记录级别
如果不需要数据流活动的每个管道执行完整地记录所有详细的遥测日志,则可根据需要将日志记录级别设置为“基本”或“无”。 在“详细”模式(默认)下执行数据流时,要求此服务在数据转换期间完整地记录每个分区级别的活动。 该操作成本昂贵,因此仅在进行故障排除时启用“详细”模式可优化整体数据流和管道性能。 “基本”模式仅记录转换持续时间,而“无”模式仅提供持续时间摘要。
汇属性
数据流中的分组功能既可以设置接收器的执行顺序,又可以使用相同的组号将接收器分组在一起。 为帮助管理组,可以要求此服务在同一组中运行接收器,以并行运行。 还可以将接收器组设置为继续,即使其中一个接收器遇到错误仍可这样设置。
数据流接收器的默认行为是以串行方式顺序执行每个接收器,并且在接收器中遇到错误时,数据流会失败。 此外,除非访问数据流属性并为接收器设置不同的优先级,否则所有接收器均默认为同一组。
仅第一行
此选项仅适用于已为“输出到活动”启用了缓存接收器的数据流。 注入到流水线中的数据流输出被限制为 2MB。 通过设置“仅第一行”选项,您可以在将数据流活动输出直接注入到管道时限制数据流的数据输出。
将数据流参数化
参数化数据集
如果数据流使用参数化数据集,请在“设置”选项卡中设置参数值。
参数化数据流
如果数据流已参数化,请在“参数”选项卡中设置数据流参数的动态值。可以使用管道表达式语言或数据流表达式语言来分配动态或文字参数值。 有关详细信息,请参阅 数据流 Parameters。
参数化计算属性。
如果使用 autoresolve Azure Integration Runtime 并指定 compute.coreCount 和 compute.computeType 的值,则可以参数化核心计数或计算类型。
数据流活动的管道调试
若要使用 数据流 活动来执行调试管道运行,必须通过顶部栏上的 数据流 Debug 滑块打开 数据流 调试模式。 通过调试模式,可以在活跃的 Spark 群集上运行数据流。 有关详细信息,请参阅调试模式。
调试管道针对活动调试群集运行,而不是在数据流活动设置中指定的集成运行时环境。 在启动调试模式时,可以选择调试计算环境。
监控数据流活动
数据流活动具有特殊的监视体验,你可以在其中查看分区、暂存时间和数据世系信息。 通过“操作”下的眼镜图标打开监控窗格。 有关详细信息,请参阅监视数据流。
在后续活动中使用“数据流”活动结果
数据流活动输出有关写入每个接收器的行数和从每个源读取的行数的指标。 这些结果将在活动运行结果的 output 部分中返回。 返回的指标采用以下 json 格式。
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
例如,要获取在名为“dataflowActivity”的活动中写入接收器“sink1”的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten。
要获取从该接收器中使用的源“source1”读取的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead。
注意
如果接收器写入的行数为零,则它将不会显示在指标中。 可以使用 contains 函数来验证存在性。 例如,contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') 检查是否存在写入 sink1 的任意行。
相关内容
参阅支持的控制流活动: