Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este notebook demonstra como utilizar o SDK Python de Pesquisa de Vetores, que fornece um VectorSearchClient como API principal para trabalhar com a Pesquisa de Vetores.
Este notebook usa APIs de Modelo do Databricks Foundation para acessar o modelo de inserções GTE para gerar inserções.
%pip install --upgrade --force-reinstall databricks-vectorsearch
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient(disable_notice=True)
help(VectorSearchClient)
Carregar conjunto de dados de toy na tabela Delta de origem
A seguir, cria a tabela Delta de origem.
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
Conjunto de dados de exemplo em blocos
Agrupar o conjunto de dados de exemplo ajuda você a evitar exceder o limite de contexto do modelo de inserção. O modelo GTE dá suporte a até 8.192 tokens. No entanto, o Databricks recomenda que você divida os dados em partes de contexto menores para que você possa alimentar uma variedade maior de exemplos no modelo de raciocínio para seu aplicativo RAG.
import tiktoken
import pandas as pd
# The GTE model has been trained on a max context lenth of 8192 tokens.
max_chunk_tokens = 8192
encoding = tiktoken.get_encoding("cl100k_base")
def chunk_text(text):
# Encode and then decode within the UDF
tokens = encoding.encode(text)
chunks = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunks
# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
text_chunks = chunk_text(row['text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
# replace the id column with a new unique chunk id
# and the text column with the text chunk
row_data['id'] = f"{row['id']}_{chunk_no}"
row_data['text'] = chunk
processed_data.append(row_data)
chunk_no += 1
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
Criar ponto de extremidade de pesquisa de vetor
vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
name=vector_search_endpoint_name
)
Criar índice de vetor
# Vector index
vs_index = f"{source_table_name}_gte_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
embedding_model_endpoint = "databricks-gte-large-en"
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.
import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
print("Waiting for index to be ready...")
time.sleep(30)
print("Index is ready!")
index.describe()
Pesquisa de similaridade
As células a seguir mostram como consultar o Índice de Vetores para localizar documentos semelhantes.
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5
)
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5,
filters={"title NOT": "Hercules"}
)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=["id", "text", "title"],
# num_results=5,
# filters='title != "Hercules"'
# )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
Excluir índice de vetor
vsc.delete_index(
endpoint_name=vector_search_endpoint_name,
index_name=vs_index_fullname
)