Milvus와 Unstructured로 RAG 구축하기
Unstructured는 검색 증강 생성(RAG)과 모델 미세 조정을 위해 비정형 문서를 수집 및 처리할 수 있는 플랫폼과 도구를 제공합니다. 코드가 필요 없는 UI 플랫폼과 서버리스 API 서비스를 모두 제공하여 사용자가 Unstructured가 호스팅하는 컴퓨팅 리소스에서 데이터를 처리할 수 있습니다.
이 튜토리얼에서는 Unstructured를 사용하여 PDF 문서를 수집한 다음 Milvus를 사용하여 RAG 파이프라인을 구축합니다.
준비 사항
종속성 및 환경
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Google Colab을 사용하는 경우 방금 설치한 종속 요소를 사용하려면 런타임을 다시 시작해야 할 수 있습니다(화면 상단의 "런타임" 메뉴를 클릭하고 드롭다운 메뉴에서 "세션 다시 시작"을 선택).
여기에서 UNSTRUCTURED_API_KEY
및 UNSTRUCTURED_URL
환경 변수를 가져올 수 있습니다.
이 예제에서는 OpenAI를 LLM으로 사용하겠습니다. 환경 변수로 OPENAI_API_KEY
API 키를 준비해야 합니다.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
Milvus 및 OpenAI 클라이언트 준비
Milvus 클라이언트를 사용하여 Milvus 컬렉션을 생성하고 데이터를 삽입할 수 있습니다.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
인수는 MilvusClient
입니다:
uri
을 로컬 파일(예:./milvus.db
)로 설정하는 것이 가장 편리한 방법이며, 이 파일에 모든 데이터를 저장하기 위해 Milvus Lite를 자동으로 활용하기 때문입니다.- 백만 개 이상의 벡터와 같이 대량의 데이터가 있는 경우, Docker 또는 Kubernetes에 더 성능이 좋은 Milvus 서버를 설정할 수 있습니다. 이 설정에서는 서버 주소와 포트를 URI로 사용하세요(예:
http://localhost:19530
). Milvus에서 인증 기능을 활성화하는 경우 토큰으로 "<사용자 이름>:<사용자 비밀번호>"를 사용하고, 그렇지 않은 경우 토큰을 설정하지 마세요. - 밀버스의 완전 관리형 클라우드 서비스인 질리즈 클라우드를 사용하려면 질리즈 클라우드의 퍼블릭 엔드포인트와 API 키에 해당하는
uri
및token
을 조정하세요.
컬렉션이 이미 존재하는지 확인하고 존재한다면 삭제합니다.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
임베딩을 생성하고 응답을 생성할 OpenAI 클라이언트를 준비합니다.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
테스트 임베딩을 생성하고 해당 차원과 처음 몇 개의 요소를 인쇄합니다.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Milvus 컬렉션 만들기
다음 스키마로 컬렉션을 생성합니다:
id
각 문서의 고유 식별자인 기본 키.vector
문서의 임베딩.text
문서의 텍스트 콘텐츠.metadata
문서의 메타데이터.
그런 다음 vector
필드에 AUTOINDEX
인덱스를 구축합니다. 그런 다음 컬렉션을 만듭니다.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
비정형에서 데이터 로드
비정형은 PDF, HTML 등 다양한 파일 유형을 처리할 수 있는 유연하고 강력한 수집 파이프라인을 제공합니다. 수집 기능을 사용하여 로컬 디렉터리에서 PDF 파일을 분할하겠습니다. 그런 다음 Milvus에 데이터를 로드합니다.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Milvus에 데이터를 삽입합니다.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
검색 및 응답 생성
Milvus에서 관련 문서를 검색하는 함수를 정의합니다.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
RAG 파이프라인에서 검색된 문서를 사용하여 응답을 생성하는 함수를 정의합니다.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
샘플 질문으로 RAG 파이프라인을 테스트해 보겠습니다.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.