Construir una RAG con Milvus y Unstructured
Unstructured proporciona una plataforma y herramientas para ingerir y procesar documentos no estructurados para la Retrieval Augmented Generation (RAG) y el ajuste de modelos. Ofrece tanto una plataforma de interfaz de usuario sin código como servicios de API sin servidor, lo que permite a los usuarios procesar datos en recursos informáticos alojados en Unstructured.
En este tutorial, utilizaremos Unstructured para ingerir documentos PDF y, a continuación, utilizaremos Milvus para construir una canalización RAG.
Preparación
Dependencias y entorno
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Si utilizas Google Colab, para activar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla y selecciona "Reiniciar sesión" en el menú desplegable).
Puedes obtener tus variables de entorno UNSTRUCTURED_API_KEY
y UNSTRUCTURED_URL
desde aquí.
En este ejemplo utilizaremos OpenAI como LLM. Debes preparar la clave api OPENAI_API_KEY
como variable de entorno.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
Prepare los clientes Milvus y OpenAI
Puede utilizar el cliente Milvus para crear una colección Milvus e insertar datos en ella.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
En cuanto al argumento de MilvusClient
:
- Establecer el
uri
como un archivo local, por ejemplo./milvus.db
, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo. - Si tiene una gran escala de datos, digamos más de un millón de vectores, puede configurar un servidor Milvus más eficiente en Docker o Kubernetes. En esta configuración, por favor utilice la dirección del servidor y el puerto como su uri, por ejemplo
http://localhost:19530
. Si habilita la función de autenticación en Milvus, utilice "<su_nombre_de_usuario>:<su_contraseña>" como token, de lo contrario no configure el token. - Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste
uri
ytoken
, que corresponden al punto final público y a la clave Api en Zilliz Cloud.
Comprueba si la colección ya existe y elimínala si es así.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Prepara un cliente OpenAI para generar embeddings y generar respuestas.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Genera una incrustación de prueba e imprime su dimensión y sus primeros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Crear la colección Milvus
Crearemos una colección con el siguiente esquema:
id
: la clave primaria, que es un identificador único para cada documento.vector
La incrustación del documento.text
: el contenido textual del documento.metadata
los metadatos del documento.
Luego construimos un índice AUTOINDEX
sobre el campo vector
. Y luego creamos la colección.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Cargar datos desde Unstructured
Unstructured proporciona una canalización de ingesta flexible y potente para procesar varios tipos de archivos, incluidos PDF, HTML, etc. Utilizaremos la funcionalidad de ingesta para particionar archivos PDF en un directorio local. A continuación, cargaremos los datos en Milvus.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Insertar datos en Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
Recuperar y generar respuesta
Definir una función para recuperar documentos relevantes de Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Definir una función para generar una respuesta utilizando los documentos recuperados en la canalización RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Probemos la canalización RAG con una pregunta de ejemplo.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.