RAG con Milvus y LlamaIndex Async API

Open In Colab GitHub Repository

Este tutorial muestra cómo utilizar LlamaIndex con Milvus para construir un proceso asíncrono de documentos para RAG. LlamaIndex proporciona una forma de procesar documentos y almacenarlos en una base de datos vectorial como Milvus. Aprovechando la API asíncrona de LlamaIndex y la biblioteca cliente Python de Milvus, podemos aumentar el rendimiento de la tubería para procesar e indexar de manera eficiente grandes volúmenes de datos.

En este tutorial, primero introduciremos el uso de métodos asíncronos para construir una RAG con LlamaIndex y Milvus desde un alto nivel, y luego introduciremos el uso de métodos de bajo nivel y la comparación de rendimiento entre síncronos y asíncronos.

Antes de empezar

Los fragmentos de código de esta página requieren las dependencias de pymilvus y llamaindex. Puedes instalarlas utilizando los siguientes comandos:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

Si estás utilizando Google Colab, para habilitar las dependencias recién instaladas, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla, y selecciona "Reiniciar sesión" en el menú desplegable).

Utilizaremos los modelos de OpenAI. Debes preparar la clave api OPENAI_API_KEY como variable de entorno.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Si estás usando Jupyter Notebook, necesitas ejecutar esta línea de código antes de ejecutar el código asíncrono.

import nest_asyncio

nest_asyncio.apply()

Preparar los datos

Puedes descargar datos de ejemplo con los siguientes comandos:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

Construir RAG con procesamiento asíncrono

Esta sección muestra cómo construir un sistema RAG que pueda procesar documentos de forma asíncrona.

Importe las bibliotecas necesarias y defina Milvus URI y la dimensión de la incrustación.

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • Si tiene una gran escala de datos, puede configurar un servidor Milvus de alto rendimiento en docker o kubernetes. En esta configuración, utilice la uri del servidor, por ejemplohttp://localhost:19530, como su uri.
  • Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste los uri y token, que corresponden al punto final público y a la clave Api en Zilliz Cloud.
  • En el caso de sistemas complejos (como la comunicación en red), el procesamiento asíncrono puede aportar una mejora del rendimiento en comparación con la sincronización. Así que pensamos que Milvus-Lite no es adecuado para utilizar interfaces asíncronas porque los escenarios utilizados no son adecuados.

Definir una función de inicialización que podamos utilizar de nuevo para reconstruir la colección Milvus.

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

Utilizar SimpleDirectoryReader para envolver un objeto documento LlamaIndex del archivo paul_graham_essay.txt.

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

Instanciar un modelo de incrustación Hugging Face localmente. El uso de un modelo local evita el riesgo de alcanzar los límites de tasa de la API durante la inserción asíncrona de datos, ya que las solicitudes concurrentes de la API pueden sumarse rápidamente y agotar su presupuesto en la API pública. Sin embargo, si tiene un límite de tasa alto, puede optar por utilizar un servicio de modelo remoto en su lugar.

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

Cree un índice e inserte el documento.

Establecemos use_async en True para habilitar el modo de inserción asíncrono.

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

Inicializa el LLM.

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

Al crear el motor de consulta, también puedes establecer el parámetro use_async en True para habilitar la búsqueda asíncrona.

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

Explorar la API asíncrona

En esta sección, presentaremos el uso de la API de bajo nivel y compararemos el rendimiento de las ejecuciones síncronas y asíncronas.

Añadir asíncrono

Reinicializar el almacén de vectores.

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

Definamos una función productora de nodos, que se utilizará para generar un gran número de nodos de prueba para el índice.

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

Definir una función aync para añadir documentos al almacén vectorial. Usamos la función async_add() en la instancia de Milvus vector store.

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

Obtener el bucle de eventos.

loop = asyncio.get_event_loop()

Añadir asíncronamente documentos al almacén de vectores.

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

Comparar con sync add

Defina una función sync add. A continuación, medir el tiempo de ejecución en las mismas condiciones.

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

El resultado muestra que el proceso de adición sincrónico es mucho más lento que el asincrónico.

Reinicie el almacén vectorial y añada algunos documentos antes de ejecutar la búsqueda.

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

Define una función de búsqueda asíncrona. Utilizamos la función aquery() en la instancia del almacén vectorial Milvus.

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

Búsqueda asíncrona desde el almacén Milvus.

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

Definir una función de búsqueda sincrónica. A continuación, medir el tiempo de ejecución en las mismas condiciones.

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

El resultado muestra que el proceso de búsqueda sincrónico es mucho más lento que el asincrónico.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

¿Fue útil esta página?