通过


适用于 Fabric 的 NotebookUtils 文件系统实用工具

notebookutils.fs 提供用于处理各种文件系统的实用工具,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 存储。 请确保正确配置对 Azure Data Lake Storage Gen2Azure Blob 存储的访问。

运行以下命令以概要了解可用的方法:

notebookutils.fs.help()

下表列出了可用的文件系统方法:

方法 Signature 说明
ls ls(path: String): Array 列出目录的内容。
mkdirs mkdirs(path: String): Boolean 如果给定目录不存在,则创建给定目录,同时创建任何必要的父目录。
cp cp(src: String, dest: String, recurse: Boolean = false): Boolean 可能跨文件系统复制文件或目录。
fastcp fastcp(src: String, dest: String, recurse: Boolean = true, extraConfigs: Map = None): Boolean 使用 azcopy 工具复制文件或目录,以更好地处理大数据量,从而提高性能。
mv mv(src: String, dest: String, create_path: Boolean, overwrite: Boolean = false): Boolean 可以在文件系统之间移动文件或目录。
put put(file: String, content: String, overwrite: Boolean = false): Boolean 将以 UTF-8 编码的给定字符串写入文件。
head head(file: String, max_bytes: int = 1024 * 100): String 以 UTF-8 编码的字符串形式返回给定文件的第一 max_bytes 个字节。
append append(file: String, content: String, createFileIfNotExists: Boolean = false): Boolean 将内容追加到文件中。
rm rm(path: String, recurse: Boolean = false): Boolean 删除文件或目录。
exists exists(path: String): Boolean 检查文件或目录是否存在。
getProperties getProperties(path: String): Map 获取给定路径的属性。 仅在 Python 笔记本中可用(PySpark、Scala 或 R 不支持)。

注释

除非另有说明,否则所有文件系统方法都可在 Python、PySpark、Scala 和 R 笔记本中使用。 Scala 使用 camelCase 参数名称(例如, createPath 而不是 create_pathmaxBytes 而不是 max_bytes)。

有关装载和卸载操作,请参阅 文件装载和卸载

注释

使用notebookutils.fs时,请记住以下约束和注意:

  • 路径行为因笔记本类型而异:在 Spark 笔记本中,相对路径解析为默认 Lakehouse ABFSS 路径。 在 Python 笔记本环境中,相对路径解析为本地文件系统工作目录(/home/trusted-service-user/work)。
  • 并发写入限制notebookutils.fs.append() 由于缺乏原子性保证,不支持 notebookutils.fs.put() 对同一文件的并发写入。
  • 追加循环延迟:在循环中使用 notebookutils.fs.append() 时,在写入之间添加 0.5-1 秒的睡眠,以保持数据完整性。
  • OneLake 快捷方式限制:对于 S3/GCS 类型的快捷方式,请使用挂载路径,而非用于 cp()fastcp() 操作的 ABFS 路径。
  • 跨区域限制fastcp() 不支持跨区域复制 OneLake 中的文件。 请改用 cp()
  • 运行时版本:NotebookUtils 旨在与 Spark 3.4(运行时 v1.2)及更高版本配合使用。
  • cp() Python 笔记本中的行为:在 Python 笔记本中,cp() 内部使用与 fastcp() 相同的基于 azcopy 的机制,因此这两种方法的行为完全相同。

NotebookUtils 以与 Spark API 相同的方式处理文件系统。 以 notebookutils.fs.mkdirs() 和 Lakehouse 的使用 为例:

用法 HDFS 根目录的相对路径 ABFS 文件系统的绝对路径 驱动程序节点中本地文件系统的绝对路径
非默认 Lakehouse 不支持 notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Default Lakehouse “文件”或“表”下的目录: notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
  • 对于默认 Lakehouse,文件路径将装载到笔记本中,默认文件缓存超时为 120 秒。 这意味着文件缓存在笔记本的本地临时文件夹中 120 秒,即使这些文件已从 Lakehouse 中删除也是如此。 如果要更改超时规则,可以卸载默认 Lakehouse 文件路径,并使用其他 fileCacheTimeout 值再次装载它们。

  • 对于非默认 Lakehouse 配置,可以在安装 Lakehouse 路径期间设置适当的 fileCacheTimeout 参数。 将超时设置为 0 可确保从 Lakehouse 服务器提取最新文件。

列出文件

若要列出目录的内容,请使用 notebookutils.fs.ls('Your directory path')。 例如:

notebookutils.fs.ls("Files/tmp") # Relative path works with different base paths depending on notebook type
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # Absolute path using ABFS file system
notebookutils.fs.ls("file:/tmp")  # Full path of the local file system of driver node

notebookutils.fs.ls()使用相对路径时,API 的行为有所不同,具体取决于笔记本的类型。

  • Spark 笔记本中:相对路径相对于默认的湖屋 ABFSS 路径。 例如,notebookutils.fs.ls("Files") 指向默认 Lakehouse 中的 Files 目录。

    例如:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • 在 Python 笔记本中:相对路径相对于本地文件系统的工作目录,默认情况下为 /home/trusted-service-user/work. 因此,应使用完整路径而不是相对路径 notebookutils.fs.ls("/lakehouse/default/Files") 来访问默认 Lakehouse 中的 Files 目录。

    例如:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

查看文件属性

用于 notebookutils.fs.ls() 检查文件属性,如文件名、文件路径、文件大小以及项是文件还是目录。

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

如果需要更多可读输出,请使用 f-string:

files = notebookutils.fs.ls("Files/data")
for file in files:
    print(f"Name: {file.name}, Size: {file.size}, IsDir: {file.isDir}, Path: {file.path}")

创建新目录

如果目录不存在,请创建一个目录,包括任何必需的父目录。

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # Works with the default Lakehouse files using relative path 
notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # Based on ABFS file system 
notebookutils.fs.mkdirs("file:/<new_dir>")  # Based on local file system of driver node 

复制文件

跨文件系统复制文件或目录。 将 recurse=True 设置为递归复制目录。

notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)

注释

Python 笔记本说明:在 Python 笔记本中, cp() 内部使用相同的基于 azcopy 的机制,为 fastcp()这两种方法提供高效的性能。 由于 OneLake 快捷方式的限制,当需要使用 notebookutils.fs.cp() 从 S3/GCS 类型快捷方式复制数据时,建议使用装载的路径而不是 abfss 路径。

小窍门

始终检查布尔返回值以验证操作是否成功。 用于 notebookutils.fs.exists() 在开始复制操作之前验证源路径。

以下示例展示如何将数据从默认的 Lakehouse 跨存储拷贝至 ADLS Gen2 帐户:

notebookutils.fs.cp(
    "Files/local_data",
    "abfss://<container>@<account>.dfs.core.windows.net/remote_data",
    recurse=True
)

高性能复制文件

使用fastcp可以进行更高效的复制操作,尤其是在处理大量数据时。 参数 recurse 默认为 True.

notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)

小窍门

使用 fastcp() 而不是 cp() 用于大型数据传输。 该方法 fastcp 在后台使用 azcopy,为大容量文件操作提供明显更好的吞吐量。 在 Python 笔记本中,cp()fastcp() 使用相同的基础机制。

请记住以下注意事项:

  • notebookutils.fs.fastcp() 不支持跨区域复制 OneLake 中的文件。 在本例中,可以改用 notebookutils.fs.cp()
  • 由于 OneLake 快捷方式的限制,当需要使用 notebookutils.fs.fastcp() 从 S3/GCS 类型快捷方式复制数据时,建议使用装载的路径而不是 abfss 路径。

预览文件内容

以 UTF-8 字符串的形式返回文件的第一 max_bytes 个字节。

notebookutils.fs.head('file path', max_bytes)

小窍门

对于大型文件,请使用 head() 适当的 max_bytes 值来避免内存问题。 默认值为 100 KB(1024 * 100)。

以下示例读取文件的前 1,000 个字节:

content = notebookutils.fs.head("Files/data/sample.txt", 1000)
print(content)

注释

不同语言的 max_bytes 默认值不同:Python 和 Scala 笔记本使用 102400 (100 KB),而 R 笔记本使用 65535 (64 KB)。 在 Scala 中,此参数命名 maxBytes为 .

移动文件

跨文件系统移动文件或目录。

notebookutils.fs.mv('source file or directory', 'destination directory', create_path=True, overwrite=True)

重要

参数 create_path 默认值因运行时而异:

  • Spark 笔记本 (PySpark、Scala、R):默认为 Falsefalse 在 Scala 中, FALSE 在 R 中)。 移动操作之前,父目录必须存在。
  • Python 笔记本:默认为 True. 如果父目录不存在,则会自动创建该目录。

若要确保跨运行时的行为一致,请在代码中显式设置 create_path 参数。 在 Scala 中,此参数命名 createPath为 .

如果需要更清晰的代码,请使用命名参数:

notebookutils.fs.mv("Files/source.csv", "Files/new_folder/dest.csv", create_path=True, overwrite=True)

写入文件

将 UTF-8 字符串写入文件。

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it already exists

将内容追加到文件

将 UTF-8 字符串追加到文件中。

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it doesn't exist

重要

notebookutils.fs.append() 并且 notebookutils.fs.put() 不支持并发写入同一文件,因为缺乏原子性保证。

在循环中使用 notebookutils.fs.append API 写入同一个 forsleep 文件时,应在每次重复写入之间添加约 0.5 至 1 秒的延迟。 此建议是因为 notebookutils.fs.append API 的内部 flush 操作是异步的,因此短延迟有助于确保数据完整性。

import time

for i in range(100):
    notebookutils.fs.append("Files/output/data.txt", f"Line {i}\n", True)
    time.sleep(0.5)  # Prevent data integrity issues

删除文件或目录

删除文件或目录。 将 recurse=True 设置为递归删除目录。

notebookutils.fs.rm('file path', recurse=True) 

检查文件或目录是否存在

检查指定路径中是否存在文件或目录。 如果路径存在,则返回 True ;否则返回 False

notebookutils.fs.exists("Files/data/input.csv")

小窍门

在执行文件操作之前使用 exists() 以防止错误。 例如,在尝试复制或移动源文件之前,请检查源文件是否存在。

if notebookutils.fs.exists("Files/data/input.csv"):
    notebookutils.fs.cp("Files/data/input.csv", "Files/backup/input.csv")
    print("File copied successfully.")
else:
    print("Source file not found.")

获取文件属性

获取路径的属性,作为名称值对的映射。 它仅支持 Azure Blob 存储路径。

注释

该方法 getProperties 仅在 Python 笔记本中可用。 Spark 笔记本(PySpark、Scala 或 R)不支持它。

参数:

参数 类型 必需 说明
path String 是的 文件或目录的 ABFS 路径。

返回: 包含元数据属性(例如文件大小、创建时间、上次修改时间和内容类型)的字典(映射)。

properties = notebookutils.fs.getProperties("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")
print(properties)