Construir una RAG con Milvus y Unstructured
Unstructured proporciona una plataforma y herramientas para ingerir y procesar documentos no estructurados para la Retrieval Augmented Generation (RAG) y el ajuste de modelos. Ofrece tanto una plataforma de interfaz de usuario sin código como servicios de API sin servidor, lo que permite a los usuarios procesar datos en recursos informáticos alojados en Unstructured.
En este tutorial, utilizaremos Unstructured para ingerir documentos PDF y, a continuación, utilizaremos Milvus para construir una canalización RAG.
Preparación
Dependencias y entorno
$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai
Opciones de instalación:
- Para procesar todos los formatos de documentos:
pip install "unstructured[all-docs]" - Para formatos específicos (por ejemplo, PDF):
pip install "unstructured[pdf]" - Para más opciones de instalación, consulte la documentación de Unstructured
Si utilizas Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla, y selecciona "Reiniciar sesión" en el menú desplegable).
En este ejemplo utilizaremos OpenAI como LLM. Deberá preparar la clave api OPENAI_API_KEY como variable de entorno.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Prepare los clientes Milvus y OpenAI
Puede utilizar el cliente Milvus para crear una colección Milvus e insertar datos en ella.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")
En cuanto al argumento de MilvusClient:
- Establecer el
uricomo un archivo local, por ejemplo./milvus.db, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo. - Si tiene una gran escala de datos, digamos más de un millón de vectores, puede configurar un servidor Milvus más eficiente en Docker o Kubernetes. En esta configuración, por favor utilice la dirección del servidor y el puerto como su uri, por ejemplo
http://localhost:19530. Si habilita la función de autenticación en Milvus, utilice ": " como token, de lo contrario no establezca el token. - Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste los
uriytoken, que corresponden al Public Endpoint y a la Api key en Zilliz Cloud.
Comprueba si la colección ya existe y elimínala si es así.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Prepara un cliente OpenAI para generar embeddings y generar respuestas.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Genera una incrustación de prueba e imprime su dimensión y sus primeros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Crear la colección Milvus
Crearemos una colección con el siguiente esquema:
id: la clave primaria, que es un identificador único para cada documento.vectorLa incrustación del documento.text: el contenido textual del documento.metadatalos metadatos del documento.
Luego construimos un índice AUTOINDEX sobre el campo vector. Y luego creamos la colección.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Bounded",
)
milvus_client.load_collection(collection_name=collection_name)
Cargar datos desde Unstructured
Unstructured proporciona una canalización de ingesta flexible y potente para procesar varios tipos de archivos, incluidos PDF, HTML, etc. Particionaremos y fragmentaremos un archivo PDF local. Y luego cargaremos los datos en Milvus.
import warnings
from unstructured.partition.auto import partition
warnings.filterwarnings("ignore")
elements = partition(
filename="./pdf_files/WhatisMilvus.pdf",
strategy="hi_res",
chunking_strategy="by_title",
) # Replace with the path to your PDF file
Examinemos los elementos particionados del archivo PDF. Cada elemento representa un fragmento de contenido extraído por el proceso de partición de Unstructured.
for element in elements:
print(element)
break
What is Milvus?
Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.
Insertar datos en Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}
Recuperar y generar respuesta
Defina una función para recuperar documentos relevantes de Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Definir una función para generar una respuesta utilizando los documentos recuperados en la canalización RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Probemos la canalización RAG con una pregunta de ejemplo.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.