使用 Milvus 和 Fireworks AI 建立 RAG
Fireworks AI是一個生成式人工智能推理平台,提供業界領先的速度和生產準備,可執行和自訂模型。 Fireworks AI 提供各種生成式人工智能服務,包括無伺服器模型、隨需部署和微調功能。它提供了部署各種 AI 模型的全面環境,包括大型語言模型 (LLM) 和嵌入模型。Fireworks AI 匯聚了許多模型,讓使用者可以輕鬆存取和利用這些資源,而不需要大量的基礎架構設定。
在本教程中,我們將教您如何使用 Milvus 和 Fireworks AI 建立 RAG(Retrieval-Augmented Generation)管道。
準備工作
相依性與環境
$ pip install --upgrade pymilvus openai requests tqdm
如果您使用的是 Google Colab,為了啟用剛安裝的相依性,您可能需要重新啟動執行時(點選畫面上方的「Runtime」功能表,並從下拉式功能表中選擇「Restart session」)。
Fireworks AI 啟用 OpenAI-style API。您可以登入其官方網站,並準備api key FIREWORKS_API_KEY
作為環境變數。
import os
os.environ["FIREWORKS_API_KEY"] = "***********"
準備資料
我們使用Milvus Documentation 2.4.x中的 FAQ 頁面作為 RAG 中的私有知識,對於簡單的 RAG 管道而言,這是一個很好的資料來源。
下載 zip 檔案並解壓縮文件到資料夾milvus_docs
。
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
我們從資料夾milvus_docs/en/faq
載入所有 markdown 檔案。對於每個文件,我們只需簡單地使用「#」來分隔文件中的內容,就可以大致分隔出 markdown 檔案中每個主要部分的內容。
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
準備 LLM 和嵌入模型
我們初始化一個用戶端來準備 LLM 和嵌入模型。Fireworks AI 啟用 OpenAI 風格的 API,您可以使用相同的 API 並稍作調整來呼叫嵌入模型和 LLM。
from openai import OpenAI
fireworks_client = OpenAI(
api_key=os.environ["FIREWORKS_API_KEY"],
base_url="https://api.fireworks.ai/inference/v1",
)
定義使用客戶端產生文字嵌入的函式。我們以nomic-ai/nomic-embed-text-v1.5
模型為例。
def emb_text(text):
return (
fireworks_client.embeddings.create(
input=text, model="nomic-ai/nomic-embed-text-v1.5"
)
.data[0]
.embedding
)
產生一個測試嵌入,並列印其尺寸和前幾個元素。
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]
將資料載入 Milvus
建立集合
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
至於MilvusClient
的參數:
- 將
uri
設定為本機檔案,例如./milvus.db
,是最方便的方法,因為它會自動利用Milvus Lite將所有資料儲存在這個檔案中。 - 如果您有大規模的資料,您可以在docker 或 kubernetes 上架設效能更高的 Milvus 伺服器。在此設定中,請使用伺服器的 uri,例如
http://localhost:19530
,作為您的uri
。 - 如果您想使用Zilliz Cloud(Milvus 的完全管理雲端服務),請調整
uri
和token
,與 Zilliz Cloud 中的Public Endpoint 和 Api key對應。
檢查集合是否已經存在,如果已經存在,請將其刪除。
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
使用指定的參數建立新的集合。
如果我們沒有指定任何欄位資訊,Milvus 會自動建立一個預設的id
欄位做為主索引鍵,以及一個vector
欄位來儲存向量資料。保留的 JSON 欄位用來儲存非結構描述定義的欄位及其值。
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
插入資料
遍歷文字行,建立嵌入,然後將資料插入 Milvus。
這裡有一個新欄位text
,它是集合模式中的非定義欄位。它會自動加入保留的 JSON 動態欄位,在高層次上可視為一般欄位。
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00, 2.51it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
建立 RAG
為查詢擷取資料
讓我們指定一個關於 Milvus 的常見問題。
question = "How is data stored in milvus?"
在資料集中搜尋該問題,並擷取語義上前三名的符合資料。
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
讓我們來看看查詢的搜尋結果
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8334928750991821
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.746377170085907
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7328270673751831
]
]
使用 LLM 獲得 RAG 回應
將擷取的文件轉換成字串格式。
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
定義 Lanage Model 的系統和使用者提示。此提示與從 Milvus 擷取的文件組合。
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
使用 Fireworks 提供的llama-v3p1-405b-instruct
模型,根據提示產生回應。
response = fireworks_client.chat.completions.create(
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:
1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.
Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.
太好了!我們已經用 Milvus 和 Fireworks AI 成功地建立了一個 RAG 管道。