notebookutils.fs 提供用于处理各种文件系统的实用工具,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 存储。 请确保正确配置对 Azure Data Lake Storage Gen2 和 Azure Blob 存储的访问。
运行以下命令以概要了解可用的方法:
notebookutils.fs.help()
下表列出了可用的文件系统方法:
| 方法 | Signature | 说明 |
|---|---|---|
ls |
ls(path: String): Array |
列出目录的内容。 |
mkdirs |
mkdirs(path: String): Boolean |
如果给定目录不存在,则创建给定目录,同时创建任何必要的父目录。 |
cp |
cp(src: String, dest: String, recurse: Boolean = false): Boolean |
可能跨文件系统复制文件或目录。 |
fastcp |
fastcp(src: String, dest: String, recurse: Boolean = true, extraConfigs: Map = None): Boolean |
使用 azcopy 工具复制文件或目录,以更好地处理大数据量,从而提高性能。 |
mv |
mv(src: String, dest: String, create_path: Boolean, overwrite: Boolean = false): Boolean |
可以在文件系统之间移动文件或目录。 |
put |
put(file: String, content: String, overwrite: Boolean = false): Boolean |
将以 UTF-8 编码的给定字符串写入文件。 |
head |
head(file: String, max_bytes: int = 1024 * 100): String |
以 UTF-8 编码的字符串形式返回给定文件的第一 max_bytes 个字节。 |
append |
append(file: String, content: String, createFileIfNotExists: Boolean = false): Boolean |
将内容追加到文件中。 |
rm |
rm(path: String, recurse: Boolean = false): Boolean |
删除文件或目录。 |
exists |
exists(path: String): Boolean |
检查文件或目录是否存在。 |
getProperties |
getProperties(path: String): Map |
获取给定路径的属性。 仅在 Python 笔记本中可用(PySpark、Scala 或 R 不支持)。 |
注释
除非另有说明,否则所有文件系统方法都可在 Python、PySpark、Scala 和 R 笔记本中使用。 Scala 使用 camelCase 参数名称(例如, createPath 而不是 create_path, maxBytes 而不是 max_bytes)。
有关装载和卸载操作,请参阅 文件装载和卸载。
注释
使用notebookutils.fs时,请记住以下约束和注意:
-
路径行为因笔记本类型而异:在 Spark 笔记本中,相对路径解析为默认 Lakehouse ABFSS 路径。 在 Python 笔记本环境中,相对路径解析为本地文件系统工作目录(
/home/trusted-service-user/work)。 -
并发写入限制:
notebookutils.fs.append()由于缺乏原子性保证,不支持notebookutils.fs.put()对同一文件的并发写入。 -
追加循环延迟:在循环中使用
notebookutils.fs.append()时,在写入之间添加 0.5-1 秒的睡眠,以保持数据完整性。 -
OneLake 快捷方式限制:对于 S3/GCS 类型的快捷方式,请使用挂载路径,而非用于
cp()和fastcp()操作的 ABFS 路径。 -
跨区域限制:
fastcp()不支持跨区域复制 OneLake 中的文件。 请改用cp()。 - 运行时版本:NotebookUtils 旨在与 Spark 3.4(运行时 v1.2)及更高版本配合使用。
-
cp()Python 笔记本中的行为:在 Python 笔记本中,cp()内部使用与fastcp()相同的基于 azcopy 的机制,因此这两种方法的行为完全相同。
NotebookUtils 以与 Spark API 相同的方式处理文件系统。 以 notebookutils.fs.mkdirs() 和 Lakehouse 的使用 为例:
| 用法 | HDFS 根目录的相对路径 | ABFS 文件系统的绝对路径 | 驱动程序节点中本地文件系统的绝对路径 |
|---|---|---|---|
| 非默认 Lakehouse | 不支持 | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") |
notebookutils.fs.mkdirs("file:/<new_dir>") |
| Default Lakehouse | “文件”或“表”下的目录: notebookutils.fs.mkdirs("Files/<new_dir>") |
notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") |
notebookutils.fs.mkdirs("file:/<new_dir>") |
对于默认 Lakehouse,文件路径将装载到笔记本中,默认文件缓存超时为 120 秒。 这意味着文件缓存在笔记本的本地临时文件夹中 120 秒,即使这些文件已从 Lakehouse 中删除也是如此。 如果要更改超时规则,可以卸载默认 Lakehouse 文件路径,并使用其他
fileCacheTimeout值再次装载它们。对于非默认 Lakehouse 配置,可以在安装 Lakehouse 路径期间设置适当的
fileCacheTimeout参数。 将超时设置为 0 可确保从 Lakehouse 服务器提取最新文件。
列出文件
若要列出目录的内容,请使用 notebookutils.fs.ls('Your directory path')。 例如:
notebookutils.fs.ls("Files/tmp") # Relative path works with different base paths depending on notebook type
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # Absolute path using ABFS file system
notebookutils.fs.ls("file:/tmp") # Full path of the local file system of driver node
notebookutils.fs.ls()使用相对路径时,API 的行为有所不同,具体取决于笔记本的类型。
Spark 笔记本中:相对路径相对于默认的湖屋 ABFSS 路径。 例如,
notebookutils.fs.ls("Files")指向默认 Lakehouse 中的Files目录。例如:
notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")在 Python 笔记本中:相对路径相对于本地文件系统的工作目录,默认情况下为
/home/trusted-service-user/work. 因此,应使用完整路径而不是相对路径notebookutils.fs.ls("/lakehouse/default/Files")来访问默认 Lakehouse 中的Files目录。例如:
notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
查看文件属性
用于 notebookutils.fs.ls() 检查文件属性,如文件名、文件路径、文件大小以及项是文件还是目录。
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
如果需要更多可读输出,请使用 f-string:
files = notebookutils.fs.ls("Files/data")
for file in files:
print(f"Name: {file.name}, Size: {file.size}, IsDir: {file.isDir}, Path: {file.path}")
创建新目录
如果目录不存在,请创建一个目录,包括任何必需的父目录。
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # Works with the default Lakehouse files using relative path
notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # Based on ABFS file system
notebookutils.fs.mkdirs("file:/<new_dir>") # Based on local file system of driver node
复制文件
跨文件系统复制文件或目录。 将 recurse=True 设置为递归复制目录。
notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)
注释
Python 笔记本说明:在 Python 笔记本中, cp() 内部使用相同的基于 azcopy 的机制,为 fastcp()这两种方法提供高效的性能。
由于 OneLake 快捷方式的限制,当需要使用 notebookutils.fs.cp() 从 S3/GCS 类型快捷方式复制数据时,建议使用装载的路径而不是 abfss 路径。
小窍门
始终检查布尔返回值以验证操作是否成功。 用于 notebookutils.fs.exists() 在开始复制操作之前验证源路径。
以下示例展示如何将数据从默认的 Lakehouse 跨存储拷贝至 ADLS Gen2 帐户:
notebookutils.fs.cp(
"Files/local_data",
"abfss://<container>@<account>.dfs.core.windows.net/remote_data",
recurse=True
)
高性能复制文件
使用fastcp可以进行更高效的复制操作,尤其是在处理大量数据时。 参数 recurse 默认为 True.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)
小窍门
使用 fastcp() 而不是 cp() 用于大型数据传输。 该方法 fastcp 在后台使用 azcopy,为大容量文件操作提供明显更好的吞吐量。 在 Python 笔记本中,cp() 和 fastcp() 使用相同的基础机制。
请记住以下注意事项:
-
notebookutils.fs.fastcp()不支持跨区域复制 OneLake 中的文件。 在本例中,可以改用notebookutils.fs.cp()。 - 由于 OneLake 快捷方式的限制,当需要使用
notebookutils.fs.fastcp()从 S3/GCS 类型快捷方式复制数据时,建议使用装载的路径而不是 abfss 路径。
预览文件内容
以 UTF-8 字符串的形式返回文件的第一 max_bytes 个字节。
notebookutils.fs.head('file path', max_bytes)
小窍门
对于大型文件,请使用 head() 适当的 max_bytes 值来避免内存问题。 默认值为 100 KB(1024 * 100)。
以下示例读取文件的前 1,000 个字节:
content = notebookutils.fs.head("Files/data/sample.txt", 1000)
print(content)
注释
不同语言的 max_bytes 默认值不同:Python 和 Scala 笔记本使用 102400 (100 KB),而 R 笔记本使用 65535 (64 KB)。 在 Scala 中,此参数命名 maxBytes为 .
移动文件
跨文件系统移动文件或目录。
notebookutils.fs.mv('source file or directory', 'destination directory', create_path=True, overwrite=True)
重要
参数 create_path 默认值因运行时而异:
-
Spark 笔记本 (PySpark、Scala、R):默认为
False(false在 Scala 中,FALSE在 R 中)。 移动操作之前,父目录必须存在。 -
Python 笔记本:默认为
True. 如果父目录不存在,则会自动创建该目录。
若要确保跨运行时的行为一致,请在代码中显式设置 create_path 参数。 在 Scala 中,此参数命名 createPath为 .
如果需要更清晰的代码,请使用命名参数:
notebookutils.fs.mv("Files/source.csv", "Files/new_folder/dest.csv", create_path=True, overwrite=True)
写入文件
将 UTF-8 字符串写入文件。
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it already exists
将内容追加到文件
将 UTF-8 字符串追加到文件中。
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it doesn't exist
重要
notebookutils.fs.append() 并且 notebookutils.fs.put() 不支持并发写入同一文件,因为缺乏原子性保证。
在循环中使用 notebookutils.fs.append API 写入同一个 forsleep 文件时,应在每次重复写入之间添加约 0.5 至 1 秒的延迟。 此建议是因为 notebookutils.fs.append API 的内部 flush 操作是异步的,因此短延迟有助于确保数据完整性。
import time
for i in range(100):
notebookutils.fs.append("Files/output/data.txt", f"Line {i}\n", True)
time.sleep(0.5) # Prevent data integrity issues
删除文件或目录
删除文件或目录。 将 recurse=True 设置为递归删除目录。
notebookutils.fs.rm('file path', recurse=True)
检查文件或目录是否存在
检查指定路径中是否存在文件或目录。 如果路径存在,则返回 True ;否则返回 False。
notebookutils.fs.exists("Files/data/input.csv")
小窍门
在执行文件操作之前使用 exists() 以防止错误。 例如,在尝试复制或移动源文件之前,请检查源文件是否存在。
if notebookutils.fs.exists("Files/data/input.csv"):
notebookutils.fs.cp("Files/data/input.csv", "Files/backup/input.csv")
print("File copied successfully.")
else:
print("Source file not found.")
获取文件属性
获取路径的属性,作为名称值对的映射。 它仅支持 Azure Blob 存储路径。
注释
该方法 getProperties 仅在 Python 笔记本中可用。 Spark 笔记本(PySpark、Scala 或 R)不支持它。
参数:
| 参数 | 类型 | 必需 | 说明 |
|---|---|---|---|
path |
String | 是的 | 文件或目录的 ABFS 路径。 |
返回: 包含元数据属性(例如文件大小、创建时间、上次修改时间和内容类型)的字典(映射)。
properties = notebookutils.fs.getProperties("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")
print(properties)