通过


将代理连接到非结构化数据

AI 代理通常需要查询非结构化数据(如文档集合、知识库或文本语料库)来回答问题并提供上下文感知响应。

Databricks 提供了多种方法,用于将代理连接到矢量搜索索引和外部矢量存储中的非结构化数据。 使用预配置的 MCP 服务器立即访问 Databricks 矢量搜索索引、使用 AI Bridge 包在本地开发检索器工具,或为专用工作流生成自定义检索器函数。

使用 MCP 查询 Databricks 矢量搜索索引

如果代理需要查询 Databricks 矢量搜索索引,请使用 Databricks 托管的 MCP 服务器。 在开始之前,请使用 Databricks 托管的嵌入创建矢量搜索索引。 请参阅 创建矢量搜索终结点和索引

矢量搜索的托管 MCP URL 为: https://<workspace-hostname>/api/2.0/mcp/vector-search/{catalog}/{schema}/{index_name}

以下示例演示如何将代理连接到矢量搜索索引。 替换<catalog><schema><index-name>为您的矢量搜索索引的名称。

OpenAI 代理 SDK (应用)

from agents import Agent, Runner
from databricks.sdk import WorkspaceClient
from databricks_openai.agents import McpServer

workspace_client = WorkspaceClient()

async with McpServer.from_vector_search(
    catalog="<catalog>",
    schema="<schema>",
    index_name="<index-name>",
    workspace_client=workspace_client,
    name="vector-search",
) as vs_server:
    agent = Agent(
        name="Research assistant",
        instructions="You are a research assistant. Use the vector search tool to find relevant documents and answer questions.",
        model="databricks-claude-sonnet-4-5",
        mcp_servers=[vs_server],
    )
    result = await Runner.run(agent, "What is the return policy?")
    print(result.final_output)

在以下项 databricks.yml中授予应用对矢量搜索索引的访问权限:

resources:
  apps:
    my_agent_app:
      resources:
        - name: 'my_vector_index'
          uc_securable:
            securable_full_name: '<catalog>.<schema>.<index-name>'
            securable_type: 'TABLE'
            permission: 'SELECT'

LangGraph (应用)

from databricks.sdk import WorkspaceClient
from databricks_langchain import ChatDatabricks, DatabricksMCPServer, DatabricksMultiServerMCPClient
from langgraph.prebuilt import create_react_agent

workspace_client = WorkspaceClient()
host = workspace_client.config.host

mcp_client = DatabricksMultiServerMCPClient([
    DatabricksMCPServer(
        name="vector-search",
        url=f"{host}/api/2.0/mcp/vector-search/<catalog>/<schema>/<index-name>",
        workspace_client=workspace_client,
    ),
])

async with mcp_client:
    tools = await mcp_client.get_tools()
    agent = create_react_agent(
        ChatDatabricks(endpoint="databricks-claude-sonnet-4-5"),
        tools=tools,
    )
    result = await agent.ainvoke(
        {"messages": [{"role": "user", "content": "What is the return policy?"}]}
    )
    print(result["messages"][-1].content)

在以下项 databricks.yml中授予应用对矢量搜索索引的访问权限:

resources:
  apps:
    my_agent_app:
      resources:
        - name: 'my_vector_index'
          uc_securable:
            securable_full_name: '<catalog>.<schema>.<index-name>'
            securable_type: 'TABLE'
            permission: 'SELECT'

模型服务

from databricks.sdk import WorkspaceClient
from databricks_mcp import DatabricksMCPClient
import mlflow

workspace_client = WorkspaceClient()
host = workspace_client.config.host

# Connect to the Vector Search MCP server
mcp_client = DatabricksMCPClient(
    server_url=f"{host}/api/2.0/mcp/vector-search/<catalog>/<schema>/<index-name>",
    workspace_client=workspace_client,
)

# List available tools from the Vector Search index
tools = mcp_client.list_tools()

# Log the agent with the required resources for deployment
mlflow.pyfunc.log_model(
    "agent",
    python_model=my_agent,
    resources=mcp_client.get_databricks_resources(),
)

若要部署代理,请参阅部署用于生成 AI 应用程序的代理(模型服务)。 有关使用 MCP 资源的日志记录代理的详细信息,请参阅 使用 Databricks 托管 MCP 服务器

其他方法

查询 Databricks 外部的矢量搜索索引

查询托管在 Databricks 外部的矢量搜索索引

如果矢量索引托管在Azure Databricks外部,则可以创建 Unity 目录连接以连接到外部服务并使用代理代码中的连接。 请参阅 将代理连接到外部服务

以下示例创建一个检索器,用于为 PyFunc 风格的代理调用 Databricks 外部托管的向量索引。

  1. 在本例中,创建一个与 Azure 这样的外部服务的 Unity 目录连接。

    CREATE CONNECTION ${connection_name}
    TYPE HTTP
    OPTIONS (
      host 'https://example.search.windows.net',
      base_path '/',
      bearer_token secret ('<secret-scope>','<secret-key>')
    );
    
  2. 使用 Unity 目录连接在代理代码中定义检索器工具。 此示例使用 MLflow 修饰器启用代理跟踪。

    注释

    为了符合 MLflow 检索器架构,检索器函数应返回一个 List[Document] 对象,并使用 metadata Document 类中的字段向返回的文档添加其他属性,例如 doc_urisimilarity_score。 请参阅 MLflow 文档

    import mlflow
    import json
    
    from mlflow.entities import Document
    from typing import List, Dict, Any
    from dataclasses import asdict
    
    class VectorSearchRetriever:
      """
      Class using Databricks Vector Search to retrieve relevant documents.
      """
    
      def __init__(self):
        self.azure_search_index = "hotels_vector_index"
    
      @mlflow.trace(span_type="RETRIEVER", name="vector_search")
      def __call__(self, query_vector: List[Any], score_threshold=None) -> List[Document]:
        """
        Performs vector search to retrieve relevant chunks.
        Args:
          query: Search query.
          score_threshold: Score threshold to use for the query.
    
        Returns:
          List of retrieved Documents.
        """
        import requests
        from databricks.sdk import WorkspaceClient
    
        w = WorkspaceClient()
        json = {
          "count": true,
          "select": "HotelId, HotelName, Description, Category",
          "vectorQueries": [
            {
              "vector": query_vector,
              "k": 7,
              "fields": "DescriptionVector",
              "kind": "vector",
              "exhaustive": true,
            }
          ],
        }
    
        response = requests.post(
          f"{w.config.host}/api/2.0/unity-catalog/connections/{connection_name}/proxy/indexes/{self.azure_search_index}/docs/search?api-version=2023-07-01-Preview",
          headers={
            **w.config.authenticate(),
            "Content-Type": "application/json",
          },
          json=json,
        ).text
    
        documents = self.convert_vector_search_to_documents(response, score_threshold)
        return [asdict(doc) for doc in documents]
    
      @mlflow.trace(span_type="PARSER")
      def convert_vector_search_to_documents(
        self, vs_results, score_threshold
      ) -> List[Document]:
        docs = []
    
        for item in vs_results.get("value", []):
          score = item.get("@search.score", 0)
    
          if score >= score_threshold:
            metadata = {
              "score": score,
              "HotelName": item.get("HotelName"),
              "Category": item.get("Category"),
            }
    
            doc = Document(
              page_content=item.get("Description", ""),
              metadata=metadata,
              id=item.get("HotelId"),
            )
            docs.append(doc)
    
        return docs
    
  3. 若要运行检索器,请运行以下Python代码。 可以选择在请求中包含 矢量搜索筛选器 来筛选结果。

    retriever = VectorSearchRetriever()
    query = [0.01944167, 0.0040178085 . . .  TRIMMED FOR BREVITY 010858015, -0.017496133]
    results = retriever(query, score_threshold=0.1)
    
开发本地检索器

使用 AI Bridge 在本地开发检索器

若要在本地生成 Databricks 矢量搜索检索器工具,请使用 Databricks AI Bridge 包,如下所示databricks-langchaindatabricks-openai。 这些包包括诸如 from_vector_search 之类的辅助函数,用于从现有的 Databricks 资源中创建检索器。

LangChain/LangGraph

安装包含 Databricks AI Bridge 的 databricks-langchain 最新版本。

%pip install --upgrade databricks-langchain

以下代码原型化了一个检索器工具,该工具查询假设的矢量搜索索引并将其绑定到本地 LLM,以便可以测试其工具调用行为。

提供描述性tool_description以帮助代理理解工具并确定何时调用它。

from databricks_langchain import VectorSearchRetrieverTool, ChatDatabricks

# Initialize the retriever tool.
vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation."
)

# Run a query against the vector search index locally for testing
vs_tool.invoke("Databricks Agent Framework?")

# Bind the retriever tool to your Langchain LLM of choice
llm = ChatDatabricks(endpoint="databricks-claude-sonnet-4-5")
llm_with_tools = llm.bind_tools([vs_tool])

# Chat with your LLM to test the tool calling functionality
llm_with_tools.invoke("Based on the Databricks documentation, what is Databricks Agent Framework?")

对于使用直接访问索引或使用自托管嵌入的Delta同步索引的场景,必须配置VectorSearchRetrieverTool并指定自定义嵌入模型和文本列。 请参阅用于提供嵌入的选项

以下示例演示如何使用 VectorSearchRetrieverToolcolumns 密钥配置 embedding

from databricks_langchain import VectorSearchRetrieverTool
from databricks_langchain import DatabricksEmbeddings

embedding_model = DatabricksEmbeddings(
    endpoint="databricks-bge-large-en",
)

vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
  num_results=5, # Max number of documents to return
  columns=["primary_key", "text_column"], # List of columns to include in the search
  filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
  query_type="ANN", # Query type ("ANN" or "HYBRID").
  tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
  tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
  text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
  embedding=embedding_model # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

有关更多详细信息,请参阅 API 文档VectorSearchRetrieverTool

OpenAI

安装包含 Databricks AI Bridge 的 databricks-openai 最新版本。

%pip install --upgrade databricks-openai

下面的代码原型是查询假设矢量搜索索引并将其与 OpenAI 的 GPT 模型集成的检索器。

提供描述性tool_description以帮助代理理解工具并确定何时调用它。

有关 OpenAI 对工具的建议的详细信息,请参阅 OpenAI 函数调用文档

from databricks_openai import VectorSearchRetrieverTool
from openai import OpenAI
import json

# Initialize OpenAI client
client = OpenAI(api_key=<your_API_key>)

# Initialize the retriever tool
dbvs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation"
)

messages = [
  {"role": "system", "content": "You are a helpful assistant."},
  {
    "role": "user",
    "content": "Using the Databricks documentation, answer what is Spark?"
  }
]
first_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

# Execute function code and parse the model's response and handle function calls.
tool_call = first_response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = dbvs_tool.execute(query=args["query"])  # For self-managed embeddings, optionally pass in openai_client=client

# Supply model with results – so it can incorporate them into its final response.
messages.append(first_response.choices[0].message)
messages.append({
  "role": "tool",
  "tool_call_id": tool_call.id,
  "content": json.dumps(result)
})
second_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

对于使用直接访问索引或使用自托管嵌入的Delta同步索引的场景,必须配置VectorSearchRetrieverTool并指定自定义嵌入模型和文本列。 请参阅用于提供嵌入的选项

以下示例演示如何使用 VectorSearchRetrieverToolcolumns 密钥配置 embedding

from databricks_openai import VectorSearchRetrieverTool

vs_tool = VectorSearchRetrieverTool(
    index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
    num_results=5, # Max number of documents to return
    columns=["primary_key", "text_column"], # List of columns to include in the search
    filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
    query_type="ANN", # Query type ("ANN" or "HYBRID").
    tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
    tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
    text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
    embedding_model_name="databricks-bge-large-en" # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

有关更多详细信息,请参阅 API 文档VectorSearchRetrieverTool

在本地工具准备就绪后,可以直接将其生产为代理代码的一部分,或将其迁移到 Unity 目录函数,该函数可提供更好的可发现性和治理性,但存在某些限制。

使用 UC 函数查询 Databricks 矢量搜索(已弃用)

使用 UC 函数查询 Databricks 向量搜索(已弃用)

注释

Databricks 建议对大多数代理工具使用 MCP 服务器,但使用 Unity 目录函数定义工具仍可用于原型制作。

可以创建一个 Unity Catalog 函数,该函数封装一个 Mosaic AI 矢量搜索索引查询。 这种方法:

  • 通过治理和可发现性支持生产用例
  • 在后台使用 vector_search() SQL 函数
  • 支持自动 MLflow 跟踪
    • 必须使用page_content别名将函数的输出与metadata对齐。
    • 必须使用 metadata(而不是顶级输出键)将任何其他元数据列添加到该列。

在笔记本或 SQL 编辑器中运行以下代码以创建函数:

CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  chunked_text as page_content,
  map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => 'catalog.schema.databricks_docs_index',
    query => query,
    num_results => 5
  )

要在 AI 代理中使用此检索工具,请用 UCFunctionToolkit将其封装。 这通过在 MLflow 日志中自动生成 RETRIEVER 跨度类型来实现通过 MLflow 的自动跟踪。

from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

toolkit = UCFunctionToolkit(
    function_names=[
        "main.default.databricks_docs_vector_search"
    ]
)
tools = toolkit.tools

Unity 目录检索器工具具有以下注意事项:

  • SQL 客户端可能会限制返回的最大行数或字节数。 若要防止数据截断,请截断 UDF 返回的列值。 例如,可以使用 substring(chunked_text, 0, 8192) 来减小大型内容列的大小,并避免在执行过程中行截断。
  • 由于该工具是函数 vector_search() 的封装器,因此它受到与 vector_search() 函数相同的限制。 请参阅限制

有关 UCFunctionToolkit 的详细信息,请参阅 Unity 目录文档

向检索器工具添加跟踪

添加 MLflow 跟踪以监视和调试检索器。 通过跟踪可以查看每个执行步骤的输入、输出和元数据。

上一个示例将 @mlflow.trace 修饰器 添加到 __call__ 和分析方法。 修饰器创建一个 范围,该范围在调用函数时开始,并在函数返回时结束。 MLflow 会自动记录函数的输入和输出以及引发的任何异常。

注释

除了使用修饰器手动定义跟踪外,LangChain、LlamaIndex 和 OpenAI 库用户还可以使用 MLflow 自动日志记录。 请参阅 向应用程序添加跟踪:自动和手动跟踪

import mlflow
from mlflow.entities import Document

# This code snippet has been truncated for brevity. See the full retriever example above.
class VectorSearchRetriever:
  ...

  # Create a RETRIEVER span. The span name must match the retriever schema name.
  @mlflow.trace(span_type="RETRIEVER", name="vector_search")
  def __call__(...) -> List[Document]:
    ...

  # Create a PARSER span.
  @mlflow.trace(span_type="PARSER")
  def parse_results(...) -> List[Document]:
    ...

若要验证代理评估和 AI Playground 等下游应用程序是否正确呈现检索器跟踪,请确保修饰器满足以下要求:

  • 使用 MLflow 检索器范围架构 并验证函数是否返回 List[Document] 对象。
  • 跟踪名称和 retriever_schema 名称必须匹配才能正确配置跟踪。 请参阅以下部分,了解如何设置检索器架构。

设置检索器架构以验证 MLflow 兼容性

如果从检索器返回的跟踪或 span_type="RETRIEVER" 不符合 MLflow 的标准检索器架构,则必须手动将返回的架构映射到 MLflow 的预期字段。 这会验证 MLflow 是否可以正确跟踪检索器并在下游应用程序中呈现跟踪。

手动设置检索器架构:

  1. 定义代理时调用 mlflow.models.set_retriever_schema 。 用于 set_retriever_schema 将返回表中的列名映射到 MLflow 的预期字段,例如 primary_keytext_columndoc_uri

    # Define the retriever's schema by providing your column names
    mlflow.models.set_retriever_schema(
      name="vector_search",
      primary_key="chunk_id",
      text_column="text_column",
      doc_uri="doc_uri"
      # other_columns=["column1", "column2"],
    )
    
  2. 通过提供 other_columns 字段的列名称列表,在检索器的架构中指定额外的列。

  3. 如果有多个检索器,则可以为每个检索器架构使用唯一名称来定义多个架构。

代理创建期间设置的检索器架构会影响下游应用程序和工作流,例如评审应用和评估集。 具体而言,doc_uri 列充当检索器返回的文档的主要标识符。

  • 审阅应用 显示 doc_uri,以帮助审阅者评估响应和跟踪文档来源。 请参阅评审应用 UI
  • 评估集 用于 doc_uri 将检索器结果与预定义评估数据集进行比较,以确定检索器的召回率和精度。 请参阅评估集(MLflow 2)。

从 Unity 目录卷读取文件

如果代理需要读取存储在 Unity 目录卷中的非结构化文件(文本文档、报表、配置文件等),则可以创建使用 Databricks SDK 文件 API 直接列出和读取文件的工具。

以下示例创建代理可以使用的两种工具:

  • list_volume_files:列出卷中的文件和目录。
  • read_volume_file:从卷中读取文本文件的内容。

LangChain/LangGraph

安装包含 Databricks AI Bridge 的 databricks-langchain 最新版本。

%pip install --upgrade databricks-langchain
from databricks.sdk import WorkspaceClient
from langchain_core.tools import tool

VOLUME = "<catalog>.<schema>.<volume>"  # TODO: Replace with your volume
w = WorkspaceClient()


@tool
def list_volume_files(directory: str = "") -> str:
    """Lists files and directories in the Unity Catalog volume.
    Provide a relative directory path, or leave empty to list the volume root."""
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    path = f"{base}/{directory.lstrip('/')}" if directory else base
    entries = []
    for f in w.files.list_directory_contents(path):
        kind = "dir" if f.is_directory else "file"
        size = f" ({f.file_size} bytes)" if not f.is_directory else ""
        entries.append(f"  [{kind}] {f.name}{size}")
    return "\n".join(entries) if entries else "No files found."


@tool
def read_volume_file(file_path: str) -> str:
    """Reads a text file from the Unity Catalog volume.
    Provide the path relative to the volume root, for example 'reports/q1_summary.txt'."""
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    full_path = f"{base}/{file_path.lstrip('/')}"
    resp = w.files.download(full_path)
    return resp.contents.read().decode("utf-8")

将工具绑定到 LLM 并运行工具调用循环:

from databricks_langchain import ChatDatabricks
from langchain_core.messages import HumanMessage, ToolMessage

llm = ChatDatabricks(endpoint="databricks-claude-sonnet-4-5")
llm_with_tools = llm.bind_tools([list_volume_files, read_volume_file])

messages = [HumanMessage(content="What files are in the volume? Can you read about_databricks.txt and summarize it in 2 sentences?")]
tool_map = {"list_volume_files": list_volume_files, "read_volume_file": read_volume_file}

for _ in range(5):  # max iterations
    response = llm_with_tools.invoke(messages)
    messages.append(response)
    if not response.tool_calls:
        break
    for tc in response.tool_calls:
        result = tool_map[tc["name"]].invoke(tc["args"])
        messages.append(ToolMessage(content=result, tool_call_id=tc["id"]))

print(response.content)

OpenAI

安装包含 Databricks AI Bridge 的 databricks-openai 最新版本。

%pip install --upgrade databricks-openai
from databricks.sdk import WorkspaceClient
from databricks_openai import DatabricksOpenAI
import json

VOLUME = "<catalog>.<schema>.<volume>"  # TODO: Replace with your volume
w = WorkspaceClient()
client = DatabricksOpenAI()

# Define the tool specifications
tools = [
    {
        "type": "function",
        "function": {
            "name": "list_volume_files",
            "description": "Lists files and directories in the Unity Catalog volume. Provide a relative directory path, or leave empty to list the volume root.",
            "parameters": {
                "type": "object",
                "properties": {
                    "directory": {
                        "type": "string",
                        "description": "Relative directory path within the volume. Leave empty for root.",
                    }
                },
                "required": [],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "read_volume_file",
            "description": "Reads a text file from the Unity Catalog volume. Provide the path relative to the volume root, for example 'reports/q4_summary.txt'.",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "Path to the file relative to the volume root.",
                    }
                },
                "required": ["file_path"],
            },
        },
    },
]


def execute_tool(name: str, args: dict) -> str:
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    if name == "list_volume_files":
        directory = args.get("directory", "")
        path = f"{base}/{directory.lstrip('/')}" if directory else base
        entries = []
        for f in w.files.list_directory_contents(path):
            kind = "dir" if f.is_directory else "file"
            size = f" ({f.file_size} bytes)" if not f.is_directory else ""
            entries.append(f"[{kind}] {f.name}{size}")
        return "\n".join(entries) if entries else "No files found."
    elif name == "read_volume_file":
        full_path = f"{base}/{args['file_path'].lstrip('/')}"
        resp = w.files.download(full_path)
        return resp.contents.read().decode("utf-8")
    return f"Unknown tool: {name}"


# Call the model with tools
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "List the files in the volume, then read about_databricks.txt and summarize it."},
]

response = client.chat.completions.create(
    model="databricks-claude-sonnet-4-5", messages=messages, tools=tools
)

# Execute tool calls and send results back
while response.choices[0].finish_reason == "tool_calls":
    messages.append(response.choices[0].message)
    for tool_call in response.choices[0].message.tool_calls:
        args = json.loads(tool_call.function.arguments)
        result = execute_tool(tool_call.function.name, args)
        messages.append(
            {"role": "tool", "tool_call_id": tool_call.id, "content": result}
        )
    response = client.chat.completions.create(
        model="databricks-claude-sonnet-4-5", messages=messages, tools=tools
    )

print(response.choices[0].message.content)