開始使用 Dynamiq 和 Milvus
Dynamiq是強大的 Gen AI 框架,可簡化 AI 驅動應用程式的開發。Dynamiq 擁有對檢索增強生成 (RAG) 和大型語言模型 (LLM) 代理的強大支援,可讓開發人員輕鬆高效地創建智慧型動態系統。
在本教程中,我們將探討如何將 Dynamiq 與Milvus 無縫搭配使用,Milvus 是專為 RAG 工作流程打造的高性能向量資料庫。Milvus 擅長於向量嵌入的高效儲存、索引和檢索,使其成為需要快速精確存取上下文資料的人工智能系統不可或缺的元件。
本分步指南將涵蓋兩個核心 RAG 工作流程:
文件索引流程:學習如何處理輸入文件(例如 PDF),將其內容轉換為向量嵌入,並儲存到 Milvus 中。利用 Milvus 的高效能索引功能,可確保您的資料可供快速檢索。
文件檢索流程:探索如何查詢 Milvus 的相關文件嵌入,並使用它們與 Dynamiq 的 LLM 代理一起產生有洞察力、上下文感知的回應,創造無縫 AI 驅動的使用者體驗。
本教學結束時,您將充分瞭解 Milvus 與 Dynamiq 如何協同合作,建立符合您需求的可擴充、情境感知 AI 系統。
準備工作
下載所需的函式庫
$ pip install dynamiq pymilvus
如果您使用的是 Google Colab,為了啟用剛安裝的相依性,您可能需要重新啟動執行時(按一下螢幕上方的「Runtime」功能表,並從下拉式功能表中選擇「Restart session」)。
設定 LLM 代理程式
在本範例中,我們將使用 OpenAI 作為 LLM。您應該準備api key OPENAI_API_KEY
作為環境變數。
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - 文件索引流程
本教學示範使用 Milvus 作為向量資料庫,以 Retrieval-Augmented Generation (RAG) 工作流程建立文件索引。此工作流程會擷取輸入的 PDF 檔案,將其處理為較小的區塊,使用 OpenAI 的嵌入模型產生向量嵌入,並將嵌入儲存於 Milvus 資料集中,以便進行有效的檢索。
在此工作流程結束時,您將擁有一個可擴充的高效率文件索引系統,可支援未來的 RAG 任務,例如語意搜尋和問題回答。
匯入所需的程式庫並初始化工作流程
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
定義 PDF 轉換器節點
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
定義文件分割節點
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
定義嵌入節點
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
定義 Milvus 向量儲存節點
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus 提供兩種部署類型,滿足不同的使用情況:
- MilvusDeploymentType.FILE
- 本地原型或小規模資料儲存的理想選擇。
- 將
uri
設定為本機檔案路徑 (例如./milvus.db
) 以利用Milvus Lite,它會自動將所有資料儲存到指定的檔案中。 - 這是快速設定和實驗的方便選項。
- MilvusDeploymentType.HOST
專為大規模資料情境所設計,例如管理超過一百萬個向量。
自託管伺服器
- 使用Docker 或 Kubernetes 部署高效能的 Milvus 伺服器。
- 設定伺服器的位址和連接埠為
uri
(例如http://localhost:19530
)。 - 如果啟用了驗證:
- 提供
<your_username>:<your_password>
作為token
。 - 如果停用驗證:
- 不設定
token
。
Zilliz 雲端 (管理服務)
- 如需完全管理、基於雲端的 Milvus 體驗,請使用Zilliz Cloud。
- 根據 Zilliz Cloud 主控台提供的公共端點和 API 金鑰,設定
uri
和token
。
定義輸入資料並執行工作流程
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
透過這個工作流程,我們成功地使用 Milvus 作為向量資料庫,並使用 OpenAI 的 embedding model 作為語意表示,來執行文件索引管道。此設定可實現快速且精確的向量式檢索,為語意搜尋、文件檢索和情境式 AI 驅動互動等 RAG 工作流程奠定基礎。
透過 Milvus 的可擴充儲存功能和 Dynamiq 的協調功能,此解決方案已準備好進行原型和大規模生產部署。您現在可以擴充此管道,以包含其他任務,例如以檢索為基礎的問題回答或 AI 驅動的內容產生。
RAG 文件檢索流程
在本教程中,我們將實作一個「Retrieval-Augmented Generation (RAG)」文件擷取工作流程。此工作流程會接收使用者的查詢,為其產生向量嵌入,從 Milvus 向量資料庫中擷取最相關的文件,並使用大型語言模型 (LLM) 根據擷取的文件產生詳細且上下文感知的答案。
透過遵循此工作流程,您將可建立語意搜尋與問題回答的端對端解決方案,結合向量式文件擷取的強大功能與 OpenAI 先進 LLM 的能力。此方法可利用您文件資料庫中儲存的知識,對使用者的查詢做出有效率且智慧型的回應。
匯入所需的程式庫並初始化工作流程
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
定義 OpenAI 連線和文字嵌入器
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
定義 Milvus 文件擷取程式
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
定義提示範本
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
定義答案產生器
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
執行工作流程
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.