RAG mit Milvus und LlamaIndex Async API
Dieses Tutorial zeigt, wie man LlamaIndex mit Milvus verwendet, um eine asynchrone Dokumentenverarbeitungspipeline für RAG zu erstellen. LlamaIndex bietet eine Möglichkeit, Dokumente zu verarbeiten und in einer Vektor-Datenbank wie Milvus zu speichern. Durch die Nutzung der asynchronen API von LlamaIndex und der Python-Client-Bibliothek von Milvus können wir den Durchsatz der Pipeline erhöhen, um große Datenmengen effizient zu verarbeiten und zu indizieren.
In diesem Tutorial werden wir zunächst die Verwendung asynchroner Methoden vorstellen, um eine RAG mit LlamaIndex und Milvus von einer hohen Ebene aus aufzubauen, und dann die Verwendung von Low-Level-Methoden und den Leistungsvergleich von synchron und asynchron vorstellen.
Bevor Sie beginnen
Die Codeschnipsel auf dieser Seite erfordern die Abhängigkeiten von pymilvus und llamaindex. Sie können diese mit den folgenden Befehlen installieren:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" oben auf dem Bildschirm und wählen Sie "Restart session" aus dem Dropdown-Menü).
Wir werden die Modelle von OpenAI verwenden. Sie sollten den api-Schlüssel OPENAI_API_KEY als Umgebungsvariable vorbereiten.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Wenn Sie Jupyter Notebook verwenden, müssen Sie diese Codezeile ausführen, bevor Sie den asynchronen Code starten.
import nest_asyncio
nest_asyncio.apply()
Daten vorbereiten
Sie können Beispieldaten mit den folgenden Befehlen herunterladen:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
RAG mit asynchroner Verarbeitung erstellen
Dieser Abschnitt zeigt, wie man ein RAG-System baut, das Dokumente asynchron verarbeiten kann.
Importieren Sie die notwendigen Bibliotheken und definieren Sie Milvus URI und die Dimension der Einbettung.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- Wenn Sie große Datenmengen haben, können Sie einen performanten Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Server-URI, z. B.
http://localhost:19530, alsuri. - Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Service für Milvus, nutzen möchten, passen Sie
uriundtokenan, die dem Public Endpoint und Api-Schlüssel in Zilliz Cloud entsprechen. - Bei komplexen Systemen (z.B. Netzwerkkommunikation) kann die asynchrone Verarbeitung im Vergleich zur Synchronisation eine Leistungssteigerung bringen. Wir denken also, dass Milvus-Lite für die Verwendung asynchroner Schnittstellen nicht geeignet ist, da die verwendeten Szenarien nicht geeignet sind.
Definieren Sie eine Initialisierungsfunktion, die wir wieder verwenden können, um die Milvus-Sammlung neu aufzubauen.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
Verwenden Sie SimpleDirectoryReader, um ein LlamaIndex-Dokumentenobjekt aus der Datei paul_graham_essay.txt zu wickeln.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
Instanziieren Sie ein Hugging Face Einbettungsmodell lokal. Die Verwendung eines lokalen Modells vermeidet das Risiko, bei der asynchronen Dateneinfügung an die Grenzen der API-Rate zu stoßen, da sich gleichzeitige API-Anfragen schnell summieren und Ihr Budget für die öffentliche API aufbrauchen können. Wenn Sie jedoch ein hohes Ratenlimit haben, können Sie stattdessen einen Remote-Modelldienst verwenden.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
Erstellen Sie einen Index und fügen Sie das Dokument ein.
Wir setzen die use_async auf True, um den asynchronen Einfügemodus zu aktivieren.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
Initialisieren Sie den LLM.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
Bei der Erstellung der Query Engine können Sie auch den Parameter use_async auf True setzen, um die asynchrone Suche zu aktivieren.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
Erkunden Sie die asynchrone API
In diesem Abschnitt werden wir die Verwendung der API auf niedrigerer Ebene vorstellen und die Leistung von synchronen und asynchronen Läufen vergleichen.
Asynchrones Hinzufügen
Neuinitialisierung des Vektorspeichers.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
Definieren wir eine Knotenerzeugungsfunktion, die zur Erzeugung einer großen Anzahl von Testknoten für den Index verwendet wird.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
Definieren Sie eine asynchrone Funktion zum Hinzufügen von Dokumenten zum Vektorspeicher. Wir verwenden die Funktion async_add() in der Milvus-Vektorspeicherinstanz.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
Holen Sie die Ereignisschleife.
loop = asyncio.get_event_loop()
Asynchrones Hinzufügen von Dokumenten zum Vektorspeicher.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
Vergleich mit synchronem Hinzufügen
Definieren Sie eine synchrone Hinzufügungsfunktion. Messen Sie dann die Laufzeit unter der gleichen Bedingung.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
Das Ergebnis zeigt, dass der synchrone Addiervorgang viel langsamer ist als der asynchrone.
Asynchrone Suche
Re-initialisieren Sie den Vektorspeicher und fügen Sie einige Dokumente hinzu, bevor Sie die Suche starten.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
Definieren Sie eine asynchrone Suchfunktion. Wir verwenden die Funktion aquery() in der Milvus-Vektorspeicherinstanz.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Asynchrone Suche im Milvus-Speicher.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
Vergleich mit synchroner Suche
Definieren Sie eine synchrone Suchfunktion. Messen Sie dann die Laufzeit unter den gleichen Bedingungen.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
Das Ergebnis zeigt, dass der synchrone Suchprozess viel langsamer ist als der asynchrone.