适用于:✅ Fabric 数据工程和数据科学
了解如何使用用于Fabric数据工程的 Livy API 提交 Spark 批处理作业。 Livy API 目前不支持Azure服务主体(SPN)。
先决条件
Visual Studio Code、Jupyter Notebooks、PySpark 以及用于 Python 的 Microsoft 身份验证库 (MSAL) 等远程客户端。
访问 Fabric Rest API 需要 Microsoft Entra 应用令牌。 使用 Microsoft 标识平台 注册应用程序。
对于湖屋中的一些数据,此示例使用纽约市出租车和豪华轿车委员会 green_tripdata_2022_08,这是加载到湖屋中的一个 parquet 文件。
Livy API 定义用于操作的统一终结点。 按照本文中的示例操作时,请将占位符 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID} 和 {Fabric_LakehouseID} 替换为相应的值。
为 Livy API Batch 配置Visual Studio Code
在 Fabric Lakehouse 中选择 Lakehouse Settings。
导航到“Livy 终结点”部分。
将 Batch 作业连接字符串(图像中的第二个红色框)复制到代码中。
导航到 Microsoft Entra 管理中心并将应用程序(客户端)ID 和目录(租户)ID 复制到代码。
创建 Spark Batch 代码并上传到 Lakehouse
在Visual Studio Code中创建
.ipynb笔记本并插入以下代码import sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("batch_demo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") tableName = spark_context.getConf().get("spark.targetTable") if tableName is not None: print("tableName: " + str(tableName)) else: print("tableName is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4)) deltaTablePath = f"Tables/{tableName}CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)在本地保存Python文件。 此 Python 数据包中包含两个 Spark 语句,这些语句用于处理 Lakehouse 中的数据,且需要上传到您的 Lakehouse。 在 Visual Studio Code 中运行 Livy API 批处理作业时,需要引用负载的 ABFS(Azure Blob 文件系统)路径,以及在
SELECTSQL 语句中的 Lakehouse 表名称。将Python payload 上传到 Lakehouse 的文件区域。 在 Lakehouse 资源管理器中,选择 “文件”。 然后选择“ >获取数据>上传文件”。 通过文件选取器选择文件。
文件放入 Lakehouse 中的“文件”部分后,选择载荷文件名右侧的三个点(省略号),然后选择“属性”。
将此 ABFS 路径复制到步骤 1 中的笔记本单元格。
使用Microsoft Entra用户令牌或 Microsoft Entra SPN 令牌对 Livy API Spark 批处理会话进行身份验证
使用 Microsoft Entra SPN 令牌对 Livy API Spark 批处理会话进行身份验证
在Visual Studio Code中创建
.ipynb笔记本并插入以下代码。import sys from msal import ConfidentialClientApplication # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Service Principal Application ID # Certificate paths - Update these paths to your certificate files certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem" # Public certificate file private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem" # Private key file certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint # OAuth settings audience = "https://analysis.windows.net/powerbi/api/.default" authority = f"https://login.windows.net/{tenant_id}" def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. This function uses certificate-based authentication which is more secure than client secrets. Args: client_id (str): The Service Principal's client ID audience (str): The audience for the token (resource scope) authority (str): The OAuth authority URL certificate_path (str): Path to the certificate file (.pem format) private_key_path (str): Path to the private key file (.pem format) certificate_thumbprint (str): Certificate thumbprint (optional but recommended) Returns: str: The access token for API authentication Raises: Exception: If token acquisition fails """ try: # Read the certificate from PEM file with open(certificate_path, "r", encoding="utf-8") as f: certificate_pem = f.read() # Read the private key from PEM file with open(private_key_path, "r", encoding="utf-8") as f: private_key_pem = f.read() # Create the confidential client application app = ConfidentialClientApplication( client_id=client_id, authority=authority, client_credential={ "private_key": private_key_pem, "thumbprint": certificate_thumbprint, "certificate": certificate_pem } ) # Acquire token using client credentials flow token_response = app.acquire_token_for_client(scopes=[audience]) if "access_token" in token_response: print("Successfully acquired access token") return token_response["access_token"] else: raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}") except FileNotFoundError as e: print(f"Certificate file not found: {e}") sys.exit(1) except Exception as e: print(f"Error retrieving token: {e}", file=sys.stderr) sys.exit(1) # Get the access token token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)运行笔记本单元格后,您应会看到返回的 Microsoft Entra 令牌。
显示运行 cell 后返回的 Microsoft Entra SPN 令牌的屏幕截图。
使用Microsoft Entra用户令牌对 Livy API Spark 会话进行身份验证
在Visual Studio Code中创建
.ipynb笔记本并插入以下代码。from msal import PublicClientApplication import requests import time # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Application ID (can be the same as above or different) # Required scopes for Livy API access scopes = [ "https://api.fabric.microsoft.com/Lakehouse.Execute.All", # Required — execute operations in lakehouses "https://api.fabric.microsoft.com/Lakehouse.Read.All", # Required — read lakehouse metadata "https://api.fabric.microsoft.com/Code.AccessFabric.All", # Required — general Fabric API access from Spark Runtime "https://api.fabric.microsoft.com/Code.AccessStorage.All", # Required — access OneLake and Azure storage from Spark Runtime ] # Optional scopes — add these only if your Spark jobs need access to the corresponding services: # "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All" # Optional — access Azure Key Vault from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All" # Optional — access Azure Data Lake Storage Gen1 from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All" # Optional — access Azure Data Explorer from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessSQL.All" # Optional — access Azure SQL audience tokens from Spark Runtime def get_access_token(tenant_id, client_id, scopes): """ Get an access token using interactive authentication. This method will open a browser window for user authentication. Args: tenant_id (str): The Azure Active Directory tenant ID client_id (str): The application client ID scopes (list): List of required permission scopes Returns: str: The access token, or None if authentication fails """ app = PublicClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}" ) print("Opening browser for interactive authentication...") token_response = app.acquire_token_interactive(scopes=scopes) if "access_token" in token_response: print("Successfully authenticated") return token_response["access_token"] else: print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}") return None # Uncomment the lines below to use interactive authentication token = get_access_token(tenant_id, client_id, scopes) print("Access token acquired via interactive login")运行笔记本单元格后,浏览器中应会弹出一个窗口,允许您选择用于登录的身份。
选择要登录的标识后,需要批准Microsoft Entra应用注册 API 权限。
完成身份验证后,请关闭浏览器窗口。
在Visual Studio Code中,你应会看到已返回的Microsoft Entra令牌。
了解 Livy API 的 Code.* 作用域
通过 Livy API 运行 Spark 作业时, Code.* 范围控制 Spark 运行时可以代表经过身份验证的用户访问的外部服务。 需要两个;其余项是可选的,具体取决于工作负荷。
所需的 Code.* 范围
| Scope | 说明 |
|---|---|
Code.AccessFabric.All |
允许获取Microsoft Fabric的访问令牌。 所有 Livy API 操作都是必需的。 |
Code.AccessStorage.All |
允许获取用于 OneLake 和 Azure 存储的访问令牌。 在数据湖仓库中读取和写入数据所必需的。 |
可选 Code.* 作用域
仅当 Spark 作业需要在运行时访问相应的Azure服务时,才添加这些范围。
| Scope | 说明 | 何时使用 |
|---|---|---|
Code.AccessAzureKeyvault.All |
允许获取对 Azure 密钥保管库 的访问令牌。 | Spark 代码从Azure 密钥保管库检索机密、密钥或证书。 |
Code.AccessAzureDataLake.All |
允许获取Azure Data Lake Storage Gen1的访问令牌。 | Spark 代码读取或写入Azure Data Lake Storage Gen1帐户。 |
Code.AccessAzureDataExplorer.All |
允许获取用于访问Azure 数据资源管理器(Kusto)的令牌。 | Spark 代码查询或从Azure 数据资源管理器群集引入数据。 |
Code.AccessSQL.All |
允许获取用于Azure SQL的访问令牌。 | Spark 代码需要连接到Azure SQL数据库。 |
注释
这些 Lakehouse.Execute.All 和 Lakehouse.Read.All 范围也是必需的,但不属于 Code.* 家族。 它们分别授予对Fabric lakehouses 执行操作和读取元数据的权限。
提交 Livy Batch 并监视批处理作业。
添加另一个笔记本单元格并插入此代码。
# submit payload to existing batch session import requests import time import json api_base_url = "https://api.fabric.microsoft.com/v1" # Base URL for Fabric APIs # Fabric Resource IDs - Replace with your workspace and lakehouse IDs workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Construct the Livy Batch API URL # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches" # Set up authentication headers headers = {"Authorization": f"Bearer {token}"} print(f"Livy Batch API URL: {livy_base_url}") new_table_name = "TABLE_NAME" # Name for the new table # Configure the batch job print("Configuring batch job parameters...") # Batch job configuration - Modify these values for your use case payload_data = { # Job name - will appear in the Fabric UI "name": f"livy_batch_demo_{new_table_name}", # Path to your Python file in the lakehouse "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>", # Replace with your Python file path # Optional: Spark configuration parameters "conf": { "spark.targetTable": new_table_name, # Custom configuration for your application }, } print("Batch Job Configuration:") print(json.dumps(payload_data, indent=2)) try: # Submit the batch job print("\nSubmitting batch job...") post_batch = requests.post(livy_base_url, headers=headers, json=payload_data) if post_batch.status_code == 202: batch_info = post_batch.json() print("Livy batch job submitted successfully!") print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}") # Extract batch ID for monitoring batch_id = batch_info['id'] livy_batch_get_url = f"{livy_base_url}/{batch_id}" print(f"\nBatch Job ID: {batch_id}") print(f"Monitoring URL: {livy_batch_get_url}") else: print(f"Failed to submit batch job. Status code: {post_batch.status_code}") print(f"Response: {post_batch.text}") except requests.exceptions.RequestException as e: print(f"Network error occurred: {e}") except json.JSONDecodeError as e: print(f"JSON decode error: {e}") print(f"Response text: {post_batch.text}") except Exception as e: print(f"Unexpected error: {e}")运行笔记本单元格,应会看到在创建并运行 Livy 批处理作业时打印了几行。
若要查看更改,请导航回 Lakehouse。
与Fabric环境的集成
默认情况下,此 Livy API 会话针对工作区的默认初学者池运行。 或者,可以使用 Fabric 环境 创建、配置和使用 Microsoft Fabric 中的环境来自定义 Livy API 会话使用的 Spark 作业的 Spark 池。 若要使用您的Fabric环境,请通过这一行更改来更新先前的笔记本单元格。
payload_data = {
"name":"livybatchdemo_with"+ newlakehouseName,
"file":"abfss://YourABFSPathToYourPayload.py",
"conf": {
"spark.targetLakehouse": "Fabric_LakehouseID",
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}" # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
}
}
在监视中心查看作业
可以通过在左侧导航链接中选择“监视”来访问监视中心,以查看各种 Apache Spark 活动。
批处理作业处于完成状态后,可以通过导航到“监视”来查看会话状态。
选择并打开最新活动名称。
在此 Livy API 会话案例中,可以看到以前的批处理提交、运行详细信息、Spark 版本和配置。 请注意右上角的已停止状态。
若要回顾整个过程,您需要一个远程客户端,例如 Visual Studio Code、微软 Entra 应用令牌、Livy API 终结点 URL、针对 Lakehouse 中的 Spark 有效负载进行身份验证,以及最后的批处理 Livy API 会话(Batch Livy API Session)。