使用 Milvus 和 Gemini 构建 RAG
Gemini API和Google AI Studio可帮助您开始使用 Google 的最新模型,并将您的想法转化为可扩展的应用程序。Gemini 可为文本生成、文档处理、视觉、音频分析等任务提供强大的语言模型,如Gemini-1.5-Flash
、Gemini-1.5-Flash-8B
和Gemini-1.5-Pro
。通过 API,您可以输入包含数百万个标记的长语境,针对特定任务对模型进行微调,生成 JSON 等结构化输出,并利用语义检索和代码执行等功能。
在本教程中,我们将向您展示如何使用 Milvus 和 Gemini 构建 RAG(检索-增强生成)管道。我们将使用 Gemini 模型根据给定查询生成文本。我们还将使用 Milvus 来存储和检索生成的文本。
准备工作
依赖和环境
$ pip install --upgrade pymilvus google-generativeai requests tqdm
如果使用的是 Google Colab,要启用刚刚安装的依赖项,可能需要重启运行时(点击屏幕上方的 "运行时 "菜单,从下拉菜单中选择 "重启会话")。
首先应登录 Google AI Studio 平台,并将api key GEMINI_API_KEY
作为环境变量。
import os
os.environ["GEMINI_API_KEY"] = "***********"
准备数据
我们使用Milvus 文档 2.4.x中的常见问题页面作为 RAG 中的私有知识,这对于简单的 RAG 管道来说是一个很好的数据源。
下载 zip 文件并将文档解压缩到milvus_docs
文件夹中。
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
我们从milvus_docs/en/faq
文件夹中加载所有标记文件。对于每个文档,我们只需简单地使用 "#"来分隔文件中的内容,这样就能大致分隔出 markdown 文件中每个主要部分的内容。
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
准备 LLM 和嵌入模型
我们使用gemini-1.5-flash
作为 LLM,使用text-embedding-004
作为 Embeddings 模型。
让我们尝试从 LLM 生成一个测试响应:
import google.generativeai as genai
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
gemini_model = genai.GenerativeModel("gemini-1.5-flash")
response = gemini_model.generate_content("who are you")
print(response.text)
I am a large language model, trained by Google. I am an AI and don't have a personal identity or consciousness. My purpose is to process information and respond to a wide range of prompts and questions in a helpful and informative way.
生成测试嵌入并打印其维度和前几个元素。
test_embeddings = genai.embed_content(
model="models/text-embedding-004", content=["This is a test1", "This is a test2"]
)["embedding"]
embedding_dim = len(test_embeddings[0])
print(embedding_dim)
print(test_embeddings[0][:10])
768
[0.013588584, -0.004361838, -0.08481652, -0.039724775, 0.04723794, -0.0051557426, 0.026071774, 0.045514572, -0.016867816, 0.039378334]
将数据载入 Milvus
创建 Collections
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
至于MilvusClient
的参数:
- 将
uri
设置为本地文件,如./milvus.db
,是最方便的方法,因为它会自动利用Milvus Lite将所有数据存储在此文件中。 - 如果数据规模较大,可以在docker 或 kubernetes 上设置性能更强的 Milvus 服务器。在此设置中,请使用服务器 uri,例如
http://localhost:19530
,作为您的uri
。 - 如果你想使用Zilliz Cloud(Milvus 的全托管云服务),请调整
uri
和token
,它们与 Zilliz Cloud 中的公共端点和 Api 密钥相对应。
检查 Collections 是否已存在,如果已存在,则将其删除。
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
使用指定参数创建新 Collections。
如果我们不指定任何字段信息,Milvus 会自动创建一个主键的默认id
字段,以及一个存储向量数据的vector
字段。保留的 JSON 字段用于存储非 Schema 定义的字段及其值。
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
插入数据
遍历文本行,创建 Embeddings,然后将数据插入 Milvus。
这里有一个新字段text
,它是 Collections Schema 中的一个非定义字段。它将自动添加到保留的 JSON 动态字段中,在高层次上可将其视为普通字段。
from tqdm import tqdm
data = []
doc_embeddings = genai.embed_content(
model="models/text-embedding-004", content=text_lines
)["embedding"]
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 468201.38it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
构建 RAG
为查询检索数据
让我们指定一个关于 Milvus 的常见问题。
question = "How is data stored in milvus?"
在 Collections 中搜索该问题并检索语义前 3 个匹配项。
question_embedding = genai.embed_content(
model="models/text-embedding-004", content=question
)["embedding"]
search_res = milvus_client.search(
collection_name=collection_name,
data=[question_embedding],
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
让我们看看查询的搜索结果
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8048275113105774
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7574886679649353
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7453608512878418
]
]
使用 LLM 获取 RAG 响应
将检索到的文档转换为字符串格式。
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
为 Lanage 模型定义系统和用户提示。该提示与从 Milvus 检索到的文件组装在一起。
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
使用 Gemini 根据提示生成响应。
gemini_model = genai.GenerativeModel(
"gemini-1.5-flash", system_instruction=SYSTEM_PROMPT
)
response = gemini_model.generate_content(USER_PROMPT)
print(response.text)
Milvus stores data in two ways: Inserted data (vector data, scalar data, and collection-specific schema) is stored as an incremental log in persistent storage using object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage. Metadata, generated by each Milvus module, is stored in etcd.
好极了!我们利用 Milvus 和 Gemini 成功构建了一个 RAG 管道。