Dynamiq 및 Milvus 시작하기
Dynamiq은 AI 기반 애플리케이션의 개발을 간소화하는 강력한 Gen AI 프레임워크입니다. 검색 증강 생성(RAG) 및 대규모 언어 모델(LLM) 에이전트에 대한 강력한 지원을 통해 개발자가 쉽고 효율적으로 지능적이고 동적인 시스템을 만들 수 있도록 도와주는 Dynamiq입니다.
이 튜토리얼에서는 RAG 워크플로우를 위해 특별히 제작된 고성능 벡터 데이터베이스인 Milvus와 함께 Dynamiq을 원활하게 사용하는 방법을 살펴봅니다. Milvus는 벡터 임베딩의 효율적인 저장, 인덱싱 및 검색에 탁월하여 빠르고 정확한 컨텍스트 데이터 액세스가 필요한 AI 시스템에 없어서는 안 될 구성 요소입니다.
이 단계별 가이드에서는 두 가지 핵심 RAG 워크플로우를 다룹니다:
문서 색인 흐름: 입력 파일(예: PDF)을 처리하고, 그 콘텐츠를 벡터 임베딩으로 변환하여 Milvus에 저장하는 방법을 알아보세요. Milvus의 고성능 인덱싱 기능을 활용하면 데이터를 신속하게 검색할 수 있습니다.
문서 검색 흐름: Milvus에서 관련 문서 임베딩을 쿼리하고 이를 사용하여 Dynamiq의 LLM 에이전트로 통찰력 있는 컨텍스트 인식 응답을 생성하여 원활한 AI 기반 사용자 환경을 구축하는 방법을 알아보세요.
이 튜토리얼을 마치면 Milvus와 Dynamiq이 어떻게 협력하여 필요에 맞게 확장 가능한 상황 인식 AI 시스템을 구축하는지 확실히 이해할 수 있을 것입니다.
준비 사항
필요한 라이브러리 다운로드
$ pip install dynamiq pymilvus
Google Colab을 사용하는 경우 방금 설치한 종속 요소를 사용하려면 런타임을 다시 시작해야 할 수 있습니다(화면 상단의 '런타임' 메뉴를 클릭하고 드롭다운 메뉴에서 '세션 다시 시작'을 선택).
LLM 에이전트 구성
이 예제에서는 OpenAI를 LLM으로 사용하겠습니다. 환경 변수로 OPENAI_API_KEY
API 키를 준비해야 합니다.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - 문서 색인 흐름
이 튜토리얼에서는 Milvus를 벡터 데이터베이스로 사용하여 문서를 색인하는 검색 증강 생성(RAG) 워크플로우를 보여드립니다. 이 워크플로에서는 입력 PDF 파일을 가져와서 더 작은 덩어리로 처리하고, OpenAI의 임베딩 모델을 사용해 벡터 임베딩을 생성한 다음, 효율적인 검색을 위해 임베딩을 Milvus 컬렉션에 저장합니다.
이 워크플로우가 끝나면 시맨틱 검색과 질문 답변과 같은 향후 RAG 작업을 지원하는 확장 가능하고 효율적인 문서 색인 시스템을 갖추게 됩니다.
필요한 라이브러리 가져오기 및 워크플로우 초기화
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
PDF 변환기 노드 정의
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
문서 분할기 노드 정의
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
임베딩 노드 정의
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Milvus 벡터 저장소 노드 정의하기
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus는 다양한 사용 사례에 맞는 두 가지 배포 유형을 제공합니다:
- MilvusDeploymentType.FILE
- 로컬 프로토타이핑 또는 소규모 데이터 저장에 이상적입니다.
uri
을 로컬 파일 경로(예:./milvus.db
)로 설정하면 지정된 파일에 모든 데이터를 자동으로 저장하는 Milvus Lite를 활용할 수 있습니다.- 이 옵션은 빠른 설정과 실험을 위한 편리한 옵션입니다.
- MilvusDeploymentType.HOST
백만 개 이상의 벡터 관리와 같은 대규모 데이터 시나리오를 위해 설계되었습니다.
자체 호스팅 서버
- 도커 또는 쿠버네티스를 사용하여 고성능 Milvus 서버를 배포합니다.
- 서버의 주소와 포트를
uri
(예:http://localhost:19530
)로 구성합니다. - 인증이 활성화된 경우:
<your_username>:<your_password>
을token
으로 입력합니다.- 인증을 사용하지 않는 경우:
token
을 설정하지 않은 상태로 둡니다.
질리즈 클라우드(관리형 서비스)
- 클라우드 기반의 완전 관리형 밀버스 환경을 사용하려면 Zilliz Cloud를 사용하세요.
- 질리즈 클라우드 콘솔에서 제공하는 퍼블릭 엔드포인트와 API 키에 따라
uri
및token
을 설정합니다.
입력 데이터 정의 및 워크플로우 실행
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
이 워크플로우를 통해 Milvus를 벡터 데이터베이스로, OpenAI의 의미 표현을 위한 임베딩 모델을 사용하여 문서 인덱싱 파이프라인을 성공적으로 구현했습니다. 이 설정은 빠르고 정확한 벡터 기반 검색을 가능하게 하여 시맨틱 검색, 문서 검색, 문맥 AI 기반 상호 작용과 같은 RAG 워크플로우의 토대를 형성합니다.
Milvus의 확장 가능한 스토리지 기능과 Dynamiq의 오케스트레이션을 통해 이 솔루션은 프로토타이핑과 대규모 프로덕션 배포에 모두 사용할 수 있습니다. 이제 이 파이프라인을 확장하여 검색 기반 질문 답변 또는 AI 기반 콘텐츠 생성과 같은 추가 작업을 포함할 수 있습니다.
RAG 문서 검색 흐름
이 튜토리얼에서는 검색 증강 생성(RAG) 문서 검색 워크플로우를 구현합니다. 이 워크플로에서는 사용자 쿼리를 받아 벡터 임베딩을 생성하고, Milvus 벡터 데이터베이스에서 가장 관련성이 높은 문서를 검색한 다음, 검색된 문서를 기반으로 LLM(대규모 언어 모델)을 사용하여 문맥을 인식하는 상세한 답변을 생성합니다.
이 워크플로우를 따르면 벡터 기반 문서 검색의 강력한 기능과 OpenAI의 고급 LLM의 기능을 결합한 시맨틱 검색 및 질문 답변을 위한 엔드투엔드 솔루션을 만들 수 있습니다. 이 접근 방식을 사용하면 문서 데이터베이스에 저장된 지식을 활용하여 사용자 쿼리에 효율적이고 지능적으로 응답할 수 있습니다.
필요한 라이브러리 가져오기 및 워크플로우 초기화
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
OpenAI 연결 및 텍스트 임베더 정의
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Milvus 문서 리트리버 정의
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
프롬프트 템플릿 정의
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
답변 생성기 정의하기
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
워크플로 실행하기
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.