milvus-logo
LFAI
Home
  • Integraciones

Construir una RAG con Milvus y Unstructured

Open In Colab GitHub Repository

Unstructured proporciona una plataforma y herramientas para ingerir y procesar documentos no estructurados para la Retrieval Augmented Generation (RAG) y el ajuste de modelos. Ofrece tanto una plataforma de interfaz de usuario sin código como servicios de API sin servidor, lo que permite a los usuarios procesar datos en recursos informáticos alojados en Unstructured.

En este tutorial, utilizaremos Unstructured para ingerir documentos PDF y, a continuación, utilizaremos Milvus para construir una canalización RAG.

Preparación

Dependencias y entorno

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Si utilizas Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla y selecciona "Reiniciar sesión" en el menú desplegable).

Puedes obtener tus variables de entorno UNSTRUCTURED_API_KEY y UNSTRUCTURED_URL desde aquí.

En este ejemplo utilizaremos OpenAI como LLM. Debes preparar la clave api OPENAI_API_KEY como variable de entorno.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Prepare los clientes Milvus y OpenAI

Puede utilizar el cliente Milvus para crear una colección Milvus e insertar datos en ella.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

En cuanto al argumento de MilvusClient:

  • Establecer el uri como un archivo local, por ejemplo./milvus.db, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo.
  • Si tiene una gran escala de datos, digamos más de un millón de vectores, puede configurar un servidor Milvus más eficiente en Docker o Kubernetes. En esta configuración, por favor utilice la dirección del servidor y el puerto como su uri, por ejemplohttp://localhost:19530. Si habilita la función de autenticación en Milvus, utilice "<su_nombre_de_usuario>:<su_contraseña>" como token, de lo contrario no configure el token.
  • Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste uri y token, que corresponden al punto final público y a la clave Api en Zilliz Cloud.

Comprueba si la colección ya existe y elimínala si es así.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Prepara un cliente OpenAI para generar embeddings y generar respuestas.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Genera una incrustación de prueba e imprime su dimensión y sus primeros elementos.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Crear la colección Milvus

Crearemos una colección con el siguiente esquema:

  • id: la clave primaria, que es un identificador único para cada documento.
  • vectorLa incrustación del documento.
  • text: el contenido textual del documento.
  • metadatalos metadatos del documento.

Luego construimos un índice AUTOINDEX sobre el campo vector. Y luego creamos la colección.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

Cargar datos desde Unstructured

Unstructured proporciona una canalización de ingesta flexible y potente para procesar varios tipos de archivos, incluidos PDF, HTML, etc. Utilizaremos la funcionalidad de ingesta para particionar archivos PDF en un directorio local. A continuación, cargaremos los datos en Milvus.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Insertar datos en Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

Recuperar y generar respuesta

Definir una función para recuperar documentos relevantes de Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Definir una función para generar una respuesta utilizando los documentos recuperados en la canalización RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Probemos la canalización RAG con una pregunta de ejemplo.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Traducido porDeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

¿Fue útil esta página?