Milvus와 SiliconFlow로 RAG 구축하기
SiliconFlow는 확장 가능하고 표준화된 고성능 AI 인프라 플랫폼을 구축하기 위해 노력하고 있습니다. SiliconCloud는 서비스형 모델(MaaS) 플랫폼으로 설명되는 SiliconFlow의 대표 제품 중 하나입니다. 대규모 언어 모델(LLM) 및 임베딩 모델을 비롯한 다양한 AI 모델을 배포할 수 있는 포괄적인 환경을 제공합니다. SiliconCloud는 수많은 오픈 소스 모델을 통합하여 사용자가 광범위한 인프라 설정 없이도 이러한 리소스에 쉽게 액세스하고 활용할 수 있도록 지원합니다.
이 튜토리얼에서는 Milvus와 SiliconFlow를 사용하여 RAG(검색 증강 생성) 파이프라인을 구축하는 방법을 보여드리겠습니다.
준비 사항
종속성 및 환경
$ pip install --upgrade pymilvus openai requests tqdm
Google Colab을 사용하는 경우 방금 설치한 종속 요소를 활성화하려면 런타임을 다시 시작해야 합니다(화면 상단의 '런타임' 메뉴를 클릭하고 드롭다운 메뉴에서 '세션 다시 시작'을 선택).
실리콘플로우를 사용하면 OpenAI 스타일 API를 사용할 수 있습니다. 공식 웹사이트에 로그인하여 환경 변수로 SILICON_FLOW_API_KEY
API 키를 준비할 수 있습니다.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
데이터 준비
Milvus 문서 2.4.x의 FAQ 페이지를 RAG의 비공개 지식으로 사용하며, 이는 간단한 RAG 파이프라인을 위한 좋은 데이터 소스입니다.
zip 파일을 다운로드하고 milvus_docs
폴더에 문서를 압축 해제합니다.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
milvus_docs/en/faq
폴더에서 모든 마크다운 파일을 로드합니다. 각 문서에 대해 "#"를 사용하여 파일의 내용을 구분하기만 하면 마크다운 파일의 각 주요 부분의 내용을 대략적으로 구분할 수 있습니다.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
임베딩 모델 준비
클라이언트를 초기화하여 임베딩 모델을 준비합니다. 실리콘플로우에서는 OpenAI 스타일의 API를 지원하며, 약간의 조정을 통해 동일한 API를 사용하여 임베딩 모델과 LLM을 호출할 수 있습니다.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
클라이언트를 사용하여 텍스트 임베딩을 생성하는 함수를 정의합니다. BAAI/bge-large-en-v1.5
모델을 예로 사용합니다.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
테스트 임베딩을 생성하고 해당 치수와 처음 몇 개의 요소를 인쇄합니다.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Milvus에 데이터 로드
컬렉션 생성
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
MilvusClient
의 인수를 사용합니다:
uri
을 로컬 파일(예:./milvus.db
)로 설정하는 것이 가장 편리한 방법인데, Milvus Lite를 자동으로 활용하여 모든 데이터를 이 파일에 저장하기 때문입니다.- 데이터 규모가 큰 경우, 도커나 쿠버네티스에 더 고성능의 Milvus 서버를 설정할 수 있습니다. 이 설정에서는 서버 URL(예:
http://localhost:19530
)을uri
으로 사용하세요. - 밀버스의 완전 관리형 클라우드 서비스인 질리즈 클라우드를 사용하려면, 질리즈 클라우드의 퍼블릭 엔드포인트와 API 키에 해당하는
uri
와token
을 조정하세요.
컬렉션이 이미 존재하는지 확인하고 존재한다면 삭제합니다.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
지정된 파라미터로 새 컬렉션을 생성합니다.
필드 정보를 지정하지 않으면 기본 키인 id
필드와 벡터 데이터를 저장할 vector
필드가 자동으로 생성됩니다. 예약된 JSON 필드는 스키마에 정의되지 않은 필드와 그 값을 저장하는 데 사용됩니다.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
데이터 삽입
텍스트 줄을 반복하여 임베딩을 만든 다음 데이터를 Milvus에 삽입합니다.
다음은 컬렉션 스키마에 정의되지 않은 필드인 새 필드 text
입니다. 이 필드는 상위 수준에서 일반 필드로 취급할 수 있는 예약된 JSON 동적 필드에 자동으로 추가됩니다.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
RAG 구축
쿼리에 대한 데이터 검색
Milvus에 대해 자주 묻는 질문을 지정해 보겠습니다.
question = "How is data stored in milvus?"
컬렉션에서 해당 질문을 검색하고 시맨틱 상위 3개 일치 항목을 검색합니다.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
쿼리의 검색 결과를 살펴봅시다.
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
LLM을 사용하여 RAG 응답 얻기
검색된 문서를 문자열 형식으로 변환합니다.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Lanage 모델에 대한 시스템 및 사용자 프롬프트를 정의합니다. 이 프롬프트는 Milvus에서 검색된 문서로 조립됩니다.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
SiliconCloud에서 제공하는 deepseek-ai/DeepSeek-V2.5
모델을 사용하여 프롬프트에 따라 응답을 생성합니다.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
훌륭합니다! Milvus와 SiliconFlow로 RAG 파이프라인을 성공적으로 구축했습니다.