RAG avec Milvus et LlamaIndex Async API
Ce tutoriel montre comment utiliser LlamaIndex avec Milvus pour construire un pipeline de traitement de documents asynchrone pour RAG. LlamaIndex fournit un moyen de traiter les documents et de les stocker dans une base de données vectorielle comme Milvus. En exploitant l'API asynchrone de LlamaIndex et la bibliothèque client Python de Milvus, nous pouvons augmenter le débit du pipeline pour traiter et indexer efficacement de grands volumes de données.
Dans ce tutoriel, nous allons d'abord présenter l'utilisation des méthodes asynchrones pour construire un RAG avec LlamaIndex et Milvus à partir d'un niveau élevé, puis nous introduirons l'utilisation des méthodes de bas niveau et la comparaison de performance entre synchrone et asynchrone.
Avant de commencer
Les extraits de code de cette page nécessitent les dépendances pymilvus et llamaindex. Vous pouvez les installer à l'aide des commandes suivantes :
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).
Nous utiliserons les modèles d'OpenAI. Vous devez préparer la clé api OPENAI_API_KEY comme variable d'environnement.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Si vous utilisez Jupyter Notebook, vous devez exécuter cette ligne de code avant d'exécuter le code asynchrone.
import nest_asyncio
nest_asyncio.apply()
Préparer les données
Vous pouvez télécharger des échantillons de données à l'aide des commandes suivantes :
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
Construire un RAG avec un traitement asynchrone
Cette section montre comment construire un système RAG qui peut traiter des documents de manière asynchrone.
Importez les bibliothèques nécessaires et définissez l'URI Milvus et la dimension de l'intégration.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- Si vous disposez de données à grande échelle, vous pouvez mettre en place un serveur Milvus performant sur docker ou kubernetes. Dans cette configuration, veuillez utiliser l'uri du serveur, par exemple
http://localhost:19530, comme votreuri. - Si vous souhaitez utiliser Zilliz Cloud, le service cloud entièrement géré pour Milvus, adaptez les adresses
uriettoken, qui correspondent au point final public et à la clé Api dans Zilliz Cloud. - Dans le cas de systèmes complexes (tels que la communication réseau), le traitement asynchrone peut améliorer les performances par rapport à la synchronisation. Nous pensons donc que Milvus-Lite n'est pas adapté à l'utilisation d'interfaces asynchrones car les scénarios utilisés ne sont pas adaptés.
Définir une fonction d'initialisation que nous pourrons réutiliser pour reconstruire la collection Milvus.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
Utiliser SimpleDirectoryReader pour envelopper un objet document LlamaIndex à partir du fichier paul_graham_essay.txt.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
Instanciez localement un modèle d'intégration Hugging Face. L'utilisation d'un modèle local évite le risque d'atteindre les limites de débit de l'API pendant l'insertion asynchrone de données, car les demandes d'API simultanées peuvent rapidement s'additionner et épuiser votre budget dans l'API publique. Toutefois, si vous avez une limite de débit élevée, vous pouvez choisir d'utiliser un service de modèle distant à la place.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
Créez un index et insérez le document.
Nous avons défini use_async sur True pour activer le mode d'insertion asynchrone.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
Initialiser le LLM.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
Lors de la construction du moteur de requête, vous pouvez également définir le paramètre use_async à True pour activer la recherche asynchrone.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
Explorer l'API asynchrone
Dans cette section, nous allons présenter l'utilisation de l'API de bas niveau et comparer les performances des exécutions synchrones et asynchrones.
Ajout asynchrone
Réinitialiser le magasin de vecteurs.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
Définissons une fonction de production de nœuds, qui sera utilisée pour générer un grand nombre de nœuds de test pour l'index.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
Définissons une fonction aync pour ajouter des documents au magasin de vecteurs. Nous utilisons la fonction async_add() dans l'instance de magasin vectoriel Milvus.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
Obtenir la boucle d'événements.
loop = asyncio.get_event_loop()
Ajouter de manière asynchrone des documents au magasin vectoriel.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
Comparaison avec l'ajout synchrone
Définir une fonction d'ajout synchrone. Mesurer ensuite le temps d'exécution dans les mêmes conditions.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
Le résultat montre que le processus d'ajout synchrone est beaucoup plus lent que le processus asynchrone.
Recherche asynchrone
Réinitialisez le magasin de vecteurs et ajoutez quelques documents avant de lancer la recherche.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
Définissez une fonction de recherche asynchrone. Nous utilisons la fonction aquery() dans l'instance de magasin vectoriel Milvus.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Recherche asynchrone à partir du magasin Milvus.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
Comparaison avec la recherche synchrone
Définir une fonction de recherche synchrone. Mesurez ensuite le temps d'exécution dans les mêmes conditions.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
Le résultat montre que le processus de recherche synchrone est beaucoup plus lent que le processus asynchrone.