RAG con Milvus e l'API asincrona LlamaIndex
Questo tutorial mostra come utilizzare LlamaIndex con Milvus per costruire una pipeline asincrona di elaborazione dei documenti per RAG. LlamaIndex fornisce un modo per elaborare i documenti e memorizzarli in un db vettoriale come Milvus. Sfruttando le API asincrone di LlamaIndex e la libreria client Python di Milvus, possiamo aumentare il throughput della pipeline per elaborare e indicizzare in modo efficiente grandi volumi di dati.
In questo tutorial, introdurremo prima l'uso dei metodi asincroni per costruire una RAG con LlamaIndex e Milvus da un livello alto, e poi introdurremo l'uso dei metodi di basso livello e il confronto delle prestazioni tra sincrono e asincrono.
Prima di iniziare
Gli snippet di codice di questa pagina richiedono le dipendenze di pymilvus e llamaindex. È possibile installarle utilizzando i seguenti comandi:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).
Utilizzeremo i modelli di OpenAI. È necessario preparare la chiave api OPENAI_API_KEY come variabile d'ambiente.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Se si utilizza Jupyter Notebook, è necessario eseguire questa riga di codice prima di eseguire il codice asincrono.
import nest_asyncio
nest_asyncio.apply()
Preparare i dati
È possibile scaricare i dati di esempio con i seguenti comandi:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
Costruire RAG con l'elaborazione asincrona
Questa sezione mostra come costruire un sistema RAG in grado di elaborare i documenti in modo asincrono.
Importare le librerie necessarie e definire l'URI di Milvus e la dimensione dell'incorporazione.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- Se si dispone di una grande quantità di dati, è possibile configurare un server Milvus performante su docker o kubernetes. In questa configurazione, utilizzare l'URI del server, ad esempio
http://localhost:19530, comeuri. - Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito da Milvus, è necessario impostare
urietoken, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud. - Nel caso di sistemi complessi (come la comunicazione di rete), l'elaborazione asincrona può migliorare le prestazioni rispetto alla sincronizzazione. Riteniamo quindi che Milvus-Lite non sia adatto all'uso di interfacce asincrone perché gli scenari utilizzati non sono adatti.
Definire una funzione di inizializzazione da utilizzare nuovamente per ricostruire la collezione Milvus.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
Utilizzare SimpleDirectoryReader per avvolgere un oggetto documento LlamaIndex dal file paul_graham_essay.txt.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
Istanziare localmente un modello di incorporazione Hugging Face. L'uso di un modello locale evita il rischio di raggiungere i limiti di velocità dell'API durante l'inserimento asincrono dei dati, poiché le richieste API simultanee possono sommarsi rapidamente e consumare il budget dell'API pubblica. Tuttavia, se il limite di velocità è elevato, si può scegliere di utilizzare un servizio di modello remoto.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
Creare un indice e inserire il documento.
Impostiamo use_async su True per abilitare la modalità di inserimento asincrono.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
Inizializzare l'LLM.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
Quando si costruisce il motore di query, si può anche impostare il parametro use_async su True per abilitare la ricerca asincrona.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
Esplorare l'API asincrona
In questa sezione, introdurremo l'uso delle API di livello inferiore e confronteremo le prestazioni delle esecuzioni sincrone e asincrone.
Aggiunta asincrona
Reinizializzare l'archivio vettoriale.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
Definiamo una funzione di produzione di nodi, che sarà utilizzata per generare un gran numero di nodi di prova per l'indice.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
Definire una funzione asincrona per aggiungere documenti all'archivio vettoriale. Utilizziamo la funzione async_add() nell'istanza dell'archivio vettoriale Milvus.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
Ottenere il ciclo di eventi.
loop = asyncio.get_event_loop()
Aggiungere in modo asincrono i documenti all'archivio vettoriale.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
Confronto con l'aggiunta sincrona
Definire una funzione di aggiunta sincrona. Misurare il tempo di esecuzione nelle stesse condizioni.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
Il risultato mostra che il processo di aggiunta sincrona è molto più lento di quello asincrono.
Ricerca asincrona
Reinizializzare l'archivio vettoriale e aggiungere alcuni documenti prima di eseguire la ricerca.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
Definire una funzione di ricerca asincrona. Utilizziamo la funzione aquery() nell'istanza di Milvus vector store.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Ricerca asincrona dal negozio Milvus.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
Confronto con la ricerca sincrona
Definire una funzione di ricerca sincrona. Misurare poi il tempo di esecuzione nelle stesse condizioni.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
Il risultato mostra che il processo di ricerca sincrono è molto più lento di quello asincrono.