ベクター検索外部埋め込みモデル (OpenAI) の例

このノートブックでは、Vector Search Python SDK を使用する方法を示します。この SDK は、ベクター検索を操作するためのプライマリ API として VectorSearchClient を提供します。

このノートブックでは、 外部モデルの Databricks サポート を使用して OpenAI 埋め込みモデルにアクセスし、埋め込みを生成します。

%pip install --upgrade --force-reinstall databricks-vectorsearch tiktoken
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient(disable_notice=True)
# Display help for the Vector Search Client
help(VectorSearchClient)

ソース Delta テーブルにおもちゃのデータセットを読み込む

次に、ソース Delta テーブルを作成します。

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.

catalog_name = "main"
schema_name = "default"

source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment the following line if you want to start from scratch.

# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)

ブロックサンプルデータセット

サンプル データセットをチャンクすると、埋め込みモデルのコンテキスト制限を超えないようにできます。 OpenAI モデルでは、最大 8192 個のトークンがサポートされます。 ただし、Databricks では、さまざまな例を RAG アプリケーションの推論モデルにフィードできるように、データをより小さなコンテキスト チャンクに分割することをお勧めします。

import tiktoken
import pandas as pd


max_chunk_tokens = 1024
encoding = tiktoken.get_encoding("cl100k_base")


def chunk_text(text):
    # Encode and then decode within the UDF
    tokens = encoding.encode(text)
    chunks = []
    while tokens:
        chunk_tokens = tokens[:max_chunk_tokens]
        chunk_text = encoding.decode(chunk_tokens)
        chunks.append(chunk_text)
        tokens = tokens[max_chunk_tokens:]
    return chunks

# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
    text_chunks = chunk_text(row['text'])
    chunk_no = 0
    for chunk in text_chunks:
        row_data = row.to_dict()

        # Replace the id column with a new unique chunk id
        # and the text column with the text chunk
        row_data['id'] = f"{row['id']}_{chunk_no}"
        row_data['text'] = chunk

        processed_data.append(row_data)
        chunk_no += 1

chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)

# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

ベクター検索エンドポイントを作成する

vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
    name=vector_search_endpoint_name,
    endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
  name=vector_search_endpoint_name
)

OpenAI 埋め込みモデル エンドポイントを登録する

使用方法の詳細については、 OpenAI エンドポイントの構成に関する外部モデルのドキュメントを参照してください。

資格情報を指定するには、 Databricks シークレット マネージャーを使用します

embedding_model_endpoint_name = "openai-embedding-endpoint"
import mlflow.deployments

mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks")

# Configure the secret manager with the OpenAPI key and provide the
# correct scope and key name below.

mlflow_deploy_client.create_endpoint(
    name=embedding_model_endpoint_name,
    config={
        "served_entities": [{
            "external_model": {
                "name": "text-embedding-ada-002",
                "provider": "openai",
                "task": "llm/v1/embeddings",
                "openai_config": {
                    "openai_api_key": "{{secrets/demo/openai-api-key}}" # CHANGE ME
                }
            }
    }]
    }
)

ベクター インデックスを作成する

# Vector index
vs_index = f"{source_table_name}_openai_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
index = vsc.create_delta_sync_index(
  endpoint_name=vector_search_endpoint_name,
  source_table_name=source_table_fullname,
  index_name=vs_index_fullname,
  pipeline_type='TRIGGERED',
  primary_key="id",
  embedding_source_column="text",
  embedding_model_endpoint_name=embedding_model_endpoint_name
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.

import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
  print("Waiting for index to be ready...")
  time.sleep(30)
print("Index is ready!")
index.describe()

次のセルは、ベクター インデックスに対してクエリを実行して同様のドキュメントを検索する方法を示しています。

results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5
  )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.

# Standard endpoint syntax
results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5,
  filters={"title NOT": "Hercules"}
)

# Storage-optimized endpoint syntax
# results = index.similarity_search(
#   query_text="Greek myths",
#   columns=["id", "text", "title"],
#   num_results=5,
#   filters='title != "Hercules"'
#   )

rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")

ベクター インデックスを削除する

vsc.delete_index(
  endpoint_name=vector_search_endpoint_name,
  index_name=vs_index_fullname
)

ノートブックの例

ベクター検索外部埋め込みモデル (OpenAI) の例

ノートブックを入手