Milvus 및 LlamaIndex 비동기 API를 사용한 RAG
이 튜토리얼에서는 Milvus와 함께 LlamaIndex를 사용하여 RAG용 비동기 문서 처리 파이프라인을 구축하는 방법을 설명합니다. LlamaIndex는 문서를 처리하고 Milvus와 같은 벡터 DB에 저장하는 방법을 제공합니다. LlamaIndex의 비동기 API와 Milvus Python 클라이언트 라이브러리를 활용하면 파이프라인의 처리량을 늘려 대량의 데이터를 효율적으로 처리하고 색인할 수 있습니다.
이 튜토리얼에서는 먼저 비동기 메서드를 사용하여 높은 수준에서 LlamaIndex와 Milvus로 RAG를 구축하는 방법을 소개한 다음, 낮은 수준의 메서드 사용과 동기식과 비동기식의 성능 비교에 대해 소개합니다.
시작하기 전에
이 페이지의 코드 스니펫에는 pymilvus 및 llamaindex 종속성이 필요합니다. 다음 명령을 사용하여 설치할 수 있습니다:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Google Colab을 사용하는 경우 방금 설치한 종속 요소를 사용하려면 런타임을 다시 시작해야 할 수 있습니다(화면 상단의 '런타임' 메뉴를 클릭하고 드롭다운 메뉴에서 '세션 다시 시작'을 선택).
OpenAI의 모델을 사용합니다. 환경 변수로 OPENAI_API_KEY API 키를 준비해야 합니다.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
주피터 노트북을 사용하는 경우 비동기 코드를 실행하기 전에 이 코드 줄을 실행해야 합니다.
import nest_asyncio
nest_asyncio.apply()
데이터 준비
다음 명령어로 샘플 데이터를 다운로드할 수 있습니다:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
비동기 처리로 RAG 빌드
이 섹션에서는 문서를 비동기식으로 처리할 수 있는 RAG 시스템을 구축하는 방법을 보여드립니다.
필요한 라이브러리를 가져오고 Milvus URI와 임베딩의 차원을 정의합니다.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- 데이터 규모가 큰 경우, 도커나 쿠버네티스에 고성능 Milvus 서버를 설정할 수 있습니다. 이 설정에서는 서버 URI(예:
http://localhost:19530)를uri으로 사용하세요. - 밀버스의 완전 관리형 클라우드 서비스인 질리즈 클라우드를 사용하려면 질리즈 클라우드의 퍼블릭 엔드포인트와 API 키에 해당하는
uri와token을 조정하세요. - 네트워크 통신과 같이 복잡한 시스템의 경우 비동기 처리가 동기화 대비 성능 향상을 가져올 수 있습니다. 따라서 Milvus-Lite는 사용 시나리오가 적합하지 않아 비동기 인터페이스를 사용하기에 적합하지 않다고 생각합니다.
Milvus 컬렉션을 다시 빌드하는 데 다시 사용할 수 있는 초기화 함수를 정의합니다.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
SimpleDirectoryReader를 사용하여 paul_graham_essay.txt 파일에서 LlamaIndex 문서 객체를 래핑합니다.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
포옹하는 얼굴 임베딩 모델을 로컬로 인스턴스화합니다. 로컬 모델을 사용하면 동시 API 요청이 빠르게 합산되어 퍼블릭 API의 예산이 소진될 수 있으므로 비동기 데이터 삽입 중에 API 속도 제한에 도달할 위험을 피할 수 있습니다. 그러나 속도 제한이 높은 경우 원격 모델 서비스를 대신 사용할 수 있습니다.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
색인을 생성하고 문서를 삽입합니다.
use_async 을 True 으로 설정하여 비동기 삽입 모드를 활성화합니다.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
LLM을 초기화합니다.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
쿼리 엔진을 구축할 때 use_async 파라미터를 True 으로 설정하여 비동기 검색을 활성화할 수도 있습니다.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
비동기 API 살펴보기
이 섹션에서는 하위 수준의 API 사용법을 소개하고 동기 실행과 비동기 실행의 성능을 비교합니다.
비동기 추가
벡터 저장소를 다시 초기화합니다.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
인덱스에 대한 많은 수의 테스트 노드를 생성하는 데 사용될 노드 생성 함수를 정의해 보겠습니다.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
벡터 저장소에 문서를 추가하는 비동기 함수를 정의합니다. Milvus 벡터 저장소 인스턴스에서 async_add() 함수를 사용합니다.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
이벤트 루프를 가져옵니다.
loop = asyncio.get_event_loop()
벡터 저장소에 문서를 비동기적으로 추가합니다.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
동기 추가와 비교
동기 추가 함수를 정의합니다. 그런 다음 동일한 조건에서 실행 시간을 측정합니다.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
결과는 동기식 추가 프로세스가 비동기식 추가 프로세스보다 훨씬 느리다는 것을 보여줍니다.
비동기 검색
검색을 실행하기 전에 벡터 저장소를 다시 초기화하고 일부 문서를 추가합니다.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
비동기 검색 기능을 정의합니다. Milvus 벡터 저장소 인스턴스에서 aquery() 함수를 사용합니다.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Milvus 스토어에서 비동기적으로 검색합니다.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
동기 검색과 비교
동기 검색 함수를 정의합니다. 그런 다음 동일한 조건에서 실행 시간을 측정합니다.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
결과는 동기 검색 프로세스가 비동기 검색보다 훨씬 느리다는 것을 보여줍니다.