你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于:
Azure Data Factory
Azure Synapse Analytics
提示
Microsoft Fabric 中的 Data Factory 是下一代 Azure Data Factory,具有更加简化的架构、内置人工智能和新功能。 如果不熟悉数据集成,请从Fabric数据工厂开始。 现有 ADF 工作负载可以升级到 Fabric,以跨数据科学、实时分析和报告访问新功能。
如果你不熟悉Azure Data Factory,请参阅 引入到 Azure Data Factory。
在本教程中,你将使用数据流画布创建数据流,以便分析和转换 Azure Data Lake Storage (ADLS) Gen2 中的数据并将其存储在 Delta Lake 中。
Prerequisites
- Azure 订阅。 如果没有 Azure 订阅,请在开始前创建一个免费 Azure 帐户。
- Azure存储帐户。 将 ADLS 存储用作“源”和“接收器”数据存储。 如果没有存储帐户,请参阅 创建Azure存储帐户,了解创建存储帐户的步骤。
本教程中要转换的文件 MoviesDB.csv,可 在此处找到。 若要从GitHub检索文件,请将内容复制到所选文本编辑器中,以本地另存为 .csv 文件。 若要将文件上传到存储帐户,请参阅使用 Azure 门户上传 blob。 这些示例引用名为“sample-data”的容器。
创建数据工厂
在此步骤中,你将创建数据工厂并打开数据工厂 UI,以在此数据工厂中创建管道。
打开 Microsoft Edge 或 Google Chrome。 目前,数据工厂 UI 仅在 Microsoft Edge 和 Google Chrome Web 浏览器中受支持。
在左侧菜单中,选择“创建资源>集成>数据工厂”
在“新建数据工厂”页面上,在“名称”下输入“ADFTutorialDataFactory”。
选择要在其中创建数据工厂的 Azure 订阅。
对于“资源组”,请执行以下步骤之一:
a。 选择“使用现有资源组”,并从下拉列表选择现有的资源组。
b. 选择“新建”,并输入资源组的名称。
若要了解资源组,请参阅 使用资源组来管理Azure资源。
在“版本”下选择“V2”。
在“位置”下选择数据工厂所在的位置。 下拉列表中仅显示支持的位置。 数据存储(例如,Azure Storage和 SQL 数据库)和数据工厂使用的计算(例如,Azure HDInsight)可以位于其他区域。
选择“创建”。
创建完成后,通知中心内会显示通知。 选择“转到资源”导航到“数据工厂”页。
选择“创作和监视”,在单独的选项卡中启动数据工厂 UI。
创建包含数据流活动的管道
在此步骤中,要创建一个包含数据流活动的管道。
在主页上,选择“协调”。
在管道的“常规”选项卡中,输入“DeltaLake”作为管道的名称。
在“活动”窗格中,展开“移动和转换”可折叠部分。 将Data Flow活动从窗格拖放到管道画布。
在管道画布的顶部栏中,打开数据流调试滑块。 调试模式允许针对实时 Spark 群集进行转换逻辑的交互式测试。 Data Flow群集需要 5-7 分钟才能预热,建议用户在计划进行Data Flow开发时先启用调试。 有关详细信息,请参阅 调试模式。
在数据流画布中构建转换逻辑
在本教程中,要生成两个数据流。 第一个数据流是一个简单的源到接收器数据流,用来基于电影 CSV 文件生成新的 Delta Lake。 最后,创建流设计来更新 Delta Lake 中的数据。
教程目标
- 使用先决条件中的 MoviesCSV 数据集源,并通过其形成新的 Delta Lake。
- 构建相应的逻辑,用以将 1988 年电影的评级更新为“1”。
- 删除 1950 年的所有电影。
- 通过复制 1960 年的电影为 2021 年插入新电影。
从空白数据流画布开始
选择数据流编辑器窗口顶部的源转换,然后在“源设置”窗口中选择“数据集”属性旁边的“+ 新建”:
从显示的新数据集窗口中选择Azure Data Lake Storage Gen2,然后选择Continue。
“数据集类型”选择 DelimitedText,然后再次选择“继续”。
将数据集命名为“MoviesCSV”,然后在“链接服务”下选择“+ 新建”以在文件中创建新链接服务。
提供之前在“先决条件”部分创建的存储帐户的详细信息,然后浏览并选择在那里上传的 MoviesCSV 文件。
添加链接服务后,选中“第一行作为标题”复选框,然后选择“确定”以添加源。
导航到数据流设置窗口的“投影”选项卡,然后选择“检测数据类型”。
现在,在数据流编辑器窗口中选择“源”后的 +,向下滚动,在“目标”部分下选择“接收器”,向数据流添加新接收器。
在添加接收器后显示的接收器设置的“接收器”选项卡中,为“接收器类型”选择“内联”,然后为“内联数据集类型”选择“增量”。 然后为 Linked 服务选择Azure Data Lake Storage Gen2。
在存储容器中选择你希望该服务在其中创建 Delta Lake 的文件夹名称。
最后,返回管道设计器,然后选择“调试”以在画布上仅有此数据流活动的情况在调试模式下执行管道。 这将在 Azure Data Lake Storage Gen2 中生成新的 Delta Lake。
现在,从屏幕左侧的“工厂资源”菜单中,选择 + 以添加新资源,然后选择“数据流”。
和前面一样,再次选择 MoviesCSV 文件作为源,然后从“投影”选项卡中再次选择“检测数据类型”。
这一次,在创建源后,在数据流编辑器窗口中选择 +,并在源中添加筛选器转换。
在“筛选设置”窗口中添加一个“筛选条件”,用于仅显示匹配 1950、1960 和 1988 年的电影行。
现在添加“派生列”转换,将每部 1988 电影的分级更新为“1”。
Update, insert, delete, and upsert策略是在更改行转换中创建的。 在派生列后添加更改行转换。更改行策略应如下所示。
现在已为每个“更改行”类型设置了正确的策略,接下来检查是否已在接收器转换上设置了正确的更新规则
此处我们将 Delta Lake 接收器用于 Azure Data Lake Storage Gen2 数据湖,并允许插入、更新、删除。
请注意,“键列”是一个组合键,由“电影主键”列和“年份”列组成。 这是因为我们通过复制 1960 行创建了虚假的 2021 年电影。 这可通过提供唯一性来避免在查找现有行时发生冲突。
下载已完成的示例
下面是一个用于 Delta 管道的示例解决方案,其中包含用于更新/删除 Lake 中的行的数据流。
相关内容
详细了解 数据流表达式语言。