RAG con Milvus e l'API asincrona LlamaIndex

Open In Colab GitHub Repository

Questo tutorial mostra come utilizzare LlamaIndex con Milvus per costruire una pipeline asincrona di elaborazione dei documenti per RAG. LlamaIndex fornisce un modo per elaborare i documenti e memorizzarli in un db vettoriale come Milvus. Sfruttando le API asincrone di LlamaIndex e la libreria client Python di Milvus, possiamo aumentare il throughput della pipeline per elaborare e indicizzare in modo efficiente grandi volumi di dati.

In questo tutorial, introdurremo prima l'uso dei metodi asincroni per costruire una RAG con LlamaIndex e Milvus da un livello alto, e poi introdurremo l'uso dei metodi di basso livello e il confronto delle prestazioni tra sincrono e asincrono.

Prima di iniziare

Gli snippet di codice di questa pagina richiedono le dipendenze di pymilvus e llamaindex. È possibile installarle utilizzando i seguenti comandi:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).

Utilizzeremo i modelli di OpenAI. È necessario preparare la chiave api OPENAI_API_KEY come variabile d'ambiente.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Se si utilizza Jupyter Notebook, è necessario eseguire questa riga di codice prima di eseguire il codice asincrono.

import nest_asyncio

nest_asyncio.apply()

Preparare i dati

È possibile scaricare i dati di esempio con i seguenti comandi:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

Costruire RAG con l'elaborazione asincrona

Questa sezione mostra come costruire un sistema RAG in grado di elaborare i documenti in modo asincrono.

Importare le librerie necessarie e definire l'URI di Milvus e la dimensione dell'incorporazione.

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • Se si dispone di una grande quantità di dati, è possibile configurare un server Milvus performante su docker o kubernetes. In questa configurazione, utilizzare l'URI del server, ad esempiohttp://localhost:19530, come uri.
  • Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito da Milvus, è necessario impostare uri e token, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud.
  • Nel caso di sistemi complessi (come la comunicazione di rete), l'elaborazione asincrona può migliorare le prestazioni rispetto alla sincronizzazione. Riteniamo quindi che Milvus-Lite non sia adatto all'uso di interfacce asincrone perché gli scenari utilizzati non sono adatti.

Definire una funzione di inizializzazione da utilizzare nuovamente per ricostruire la collezione Milvus.

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

Utilizzare SimpleDirectoryReader per avvolgere un oggetto documento LlamaIndex dal file paul_graham_essay.txt.

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

Istanziare localmente un modello di incorporazione Hugging Face. L'uso di un modello locale evita il rischio di raggiungere i limiti di velocità dell'API durante l'inserimento asincrono dei dati, poiché le richieste API simultanee possono sommarsi rapidamente e consumare il budget dell'API pubblica. Tuttavia, se il limite di velocità è elevato, si può scegliere di utilizzare un servizio di modello remoto.

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

Creare un indice e inserire il documento.

Impostiamo use_async su True per abilitare la modalità di inserimento asincrono.

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

Inizializzare l'LLM.

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

Quando si costruisce il motore di query, si può anche impostare il parametro use_async su True per abilitare la ricerca asincrona.

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

Esplorare l'API asincrona

In questa sezione, introdurremo l'uso delle API di livello inferiore e confronteremo le prestazioni delle esecuzioni sincrone e asincrone.

Aggiunta asincrona

Reinizializzare l'archivio vettoriale.

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

Definiamo una funzione di produzione di nodi, che sarà utilizzata per generare un gran numero di nodi di prova per l'indice.

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

Definire una funzione asincrona per aggiungere documenti all'archivio vettoriale. Utilizziamo la funzione async_add() nell'istanza dell'archivio vettoriale Milvus.

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

Ottenere il ciclo di eventi.

loop = asyncio.get_event_loop()

Aggiungere in modo asincrono i documenti all'archivio vettoriale.

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

Confronto con l'aggiunta sincrona

Definire una funzione di aggiunta sincrona. Misurare il tempo di esecuzione nelle stesse condizioni.

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

Il risultato mostra che il processo di aggiunta sincrona è molto più lento di quello asincrono.

Reinizializzare l'archivio vettoriale e aggiungere alcuni documenti prima di eseguire la ricerca.

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

Definire una funzione di ricerca asincrona. Utilizziamo la funzione aquery() nell'istanza di Milvus vector store.

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

Ricerca asincrona dal negozio Milvus.

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

Definire una funzione di ricerca sincrona. Misurare poi il tempo di esecuzione nelle stesse condizioni.

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

Il risultato mostra che il processo di ricerca sincrono è molto più lento di quello asincrono.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Questa pagina è stata utile?