RAG com Milvus e API assíncrona LlamaIndex
Este tutorial demonstra como usar o LlamaIndex com o Milvus para construir um pipeline assíncrono de processamento de documentos para o RAG. O LlamaIndex fornece uma maneira de processar documentos e armazená-los em bancos de dados vetoriais como o Milvus. Aproveitando a API assíncrona do LlamaIndex e a biblioteca cliente Python do Milvus, podemos aumentar a taxa de transferência do pipeline para processar e indexar eficientemente grandes volumes de dados.
Neste tutorial, vamos primeiro introduzir o uso de métodos assíncronos para construir um RAG com LlamaIndex e Milvus a partir de um alto nível e, em seguida, introduzir o uso de métodos de baixo nível e a comparação de desempenho entre síncrono e assíncrono.
Antes de começar
Os trechos de código nesta página requerem as dependências do pymilvus e do llamaindex. Você pode instalá-las usando os seguintes comandos:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Se estiver a utilizar o Google Colab, para ativar as dependências acabadas de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
Vamos utilizar os modelos do OpenAI. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Se estiver a utilizar o Jupyter Notebook, tem de executar esta linha de código antes de executar o código assíncrono.
import nest_asyncio
nest_asyncio.apply()
Preparar dados
Você pode baixar dados de amostra com os seguintes comandos:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
Construir RAG com processamento assíncrono
Esta secção mostra como construir um sistema RAG que pode processar documentos de forma assíncrona.
Importe as bibliotecas necessárias e defina o URI do Milvus e a dimensão da incorporação.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- Se tiver uma grande escala de dados, pode configurar um servidor Milvus de alto desempenho no docker ou no kubernetes. Nesta configuração, utilize o uri do servidor, por exemplo,
http://localhost:19530, como o seuuri. - Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste os endereços
urietoken, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud. - No caso de sistemas complexos (como a comunicação em rede), o processamento assíncrono pode melhorar o desempenho em comparação com a sincronização. Assim, pensamos que o Milvus-Lite não é adequado para utilizar interfaces assíncronas porque os cenários utilizados não são adequados.
Defina uma função de inicialização que possa ser usada novamente para reconstruir a coleção Milvus.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
Utilize SimpleDirectoryReader para envolver um objeto de documento LlamaIndex a partir do ficheiro paul_graham_essay.txt.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
Instanciar um modelo de incorporação Hugging Face localmente. O uso de um modelo local evita o risco de atingir os limites de taxa da API durante a inserção assíncrona de dados, já que as solicitações simultâneas da API podem se somar rapidamente e usar seu orçamento na API pública. No entanto, se você tiver um limite de taxa alto, poderá optar por usar um serviço de modelo remoto.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
Crie um índice e insira o documento.
Definimos o endereço use_async como True para ativar o modo de inserção assíncrona.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
Inicializar o LLM.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
Ao criar o mecanismo de consulta, você também pode definir o parâmetro use_async como True para habilitar a pesquisa assíncrona.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
Explorar a API assíncrona
Nesta secção, apresentaremos a utilização da API de nível inferior e compararemos o desempenho de execuções síncronas e assíncronas.
Adição assíncrona
Reinicialize o armazenamento de vetores.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
Vamos definir uma função de produção de nós, que será usada para gerar um grande número de nós de teste para o índice.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
Defina uma função aync para adicionar documentos ao armazenamento de vectores. Utilizamos a função async_add() na instância de armazenamento de vectores do Milvus.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
Obter o ciclo de eventos.
loop = asyncio.get_event_loop()
Adicionar documentos de forma assíncrona ao armazenamento vetorial.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
Comparar com a adição síncrona
Defina uma função de adição síncrona. Em seguida, meça o tempo de execução sob as mesmas condições.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
O resultado mostra que o processo de adição síncrona é muito mais lento do que o processo assíncrono.
Pesquisa assíncrona
Reinicialize o armazenamento de vectores e adicione alguns documentos antes de executar a pesquisa.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
Defina uma função de pesquisa assíncrona. Utilizamos a função aquery() na instância do armazenamento vetorial Milvus.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Pesquisa assíncrona a partir do repositório Milvus.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
Comparar com a pesquisa síncrona
Defina uma função de pesquisa sincronizada. Em seguida, mede-se o tempo de execução sob as mesmas condições.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
O resultado mostra que o processo de pesquisa sincronizada é muito mais lento do que o processo assíncrono.