milvus-logo
LFAI
首页
  • 集成
    • LLMs

使用 Milvus 和 Fireworks AI 构建 RAG

Open In Colab GitHub Repository

Fireworks AI是一个生成式人工智能推理平台,为运行和定制模型提供业界领先的速度和生产就绪状态。 Fireworks AI 提供各种生成式人工智能服务,包括无服务器模型、按需部署和微调功能。它为部署各种人工智能模型(包括大型语言模型(LLMs)和 Embeddings 模型)提供了全面的环境。Fireworks AI 聚合了众多模型,使用户能够轻松访问和利用这些资源,而无需进行大量的基础架构设置。

在本教程中,我们将向您展示如何使用 Milvus 和 Fireworks AI 构建 RAG(检索-增强生成)管道。

准备工作

依赖和环境

$ pip install --upgrade pymilvus openai requests tqdm

如果使用的是 Google Colab,要启用刚安装的依赖项,可能需要重启运行时(点击屏幕上方的 "Runtime "菜单,从下拉菜单中选择 "Restart session")。

Fireworks AI 支持 OpenAI 风格的 API。您可以登录其官方网站,并将api key FIREWORKS_API_KEY 作为环境变量。

import os

os.environ["FIREWORKS_API_KEY"] = "***********"

准备数据

我们使用Milvus 文档 2.4.x中的常见问题页面作为 RAG 中的私有知识,这对于简单的 RAG 管道来说是一个很好的数据源。

下载 zip 文件并将文档解压缩到milvus_docs 文件夹中。

$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从milvus_docs/en/faq 文件夹中加载所有标记文件。对于每个文档,我们只需简单地使用 "#"来分隔文件中的内容,这样就能大致分隔出 markdown 文件中每个主要部分的内容。

from glob import glob

text_lines = []

for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    text_lines += file_text.split("# ")

准备 LLM 和嵌入模型

我们初始化一个客户端来准备 LLM 和 Embeddings 模型。Fireworks AI 启用了 OpenAI 风格的 API,你可以稍作调整后使用相同的 API 来调用嵌入模型和 LLM。

from openai import OpenAI

fireworks_client = OpenAI(
    api_key=os.environ["FIREWORKS_API_KEY"],
    base_url="https://api.fireworks.ai/inference/v1",
)

定义一个函数,使用客户端生成文本嵌入。我们以nomic-ai/nomic-embed-text-v1.5 模型为例。

def emb_text(text):
    return (
        fireworks_client.embeddings.create(
            input=text, model="nomic-ai/nomic-embed-text-v1.5"
        )
        .data[0]
        .embedding
    )

生成一个测试嵌入并打印其维度和前几个元素。

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]

将数据载入 Milvus

创建 Collections

from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

至于MilvusClient 的参数:

  • uri 设置为本地文件,如./milvus.db ,是最方便的方法,因为它会自动利用Milvus Lite将所有数据存储在此文件中。
  • 如果数据规模较大,可以在docker 或 kubernetes 上设置性能更强的 Milvus 服务器。在此设置中,请使用服务器 uri,例如http://localhost:19530 ,作为您的uri
  • 如果你想使用Zilliz Cloud(Milvus 的全托管云服务),请调整uritoken ,它们与 Zilliz Cloud 中的公共端点和 Api 密钥相对应。

检查 Collections 是否已存在,如果已存在,则将其删除。

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

使用指定参数创建新 Collections。

如果我们没有指定任何字段信息,Milvus 会自动创建一个主键的默认id 字段,以及一个存储向量数据的vector 字段。保留的 JSON 字段用于存储非 Schema 定义的字段及其值。

milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # Inner product distance
    consistency_level="Strong",  # Strong consistency level
)

插入数据

遍历文本行,创建 Embeddings,然后将数据插入 Milvus。

这里有一个新字段text ,它是 Collections Schema 中的一个非定义字段。它将自动添加到保留的 JSON 动态字段中,在高层次上可将其视为普通字段。

from tqdm import tqdm

data = []

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": emb_text(line), "text": line})

milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00,  2.51it/s]





{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}

构建 RAG

为查询检索数据

让我们指定一个关于 Milvus 的常见问题。

question = "How is data stored in milvus?"

在 Collections 中搜索该问题,并检索语义前 3 个匹配项。

search_res = milvus_client.search(
    collection_name=collection_name,
    data=[
        emb_text(question)
    ],  # Use the `emb_text` function to convert the question to an embedding vector
    limit=3,  # Return top 3 results
    search_params={"metric_type": "IP", "params": {}},  # Inner product distance
    output_fields=["text"],  # Return the text field
)

让我们看看查询的搜索结果

import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
    [
        " Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
        0.8334928750991821
    ],
    [
        "How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
        0.746377170085907
    ],
    [
        "What is the maximum dataset size Milvus can handle?\n\n  \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
        0.7328270673751831
    ]
]

使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

为 Lanage 模型定义系统和用户提示。该提示与从 Milvus 检索到的文档组装在一起。

SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""

使用 Fireworks 提供的llama-v3p1-405b-instruct 模型,根据提示生成响应。

response = fireworks_client.chat.completions.create(
    model="accounts/fireworks/models/llama-v3p1-405b-instruct",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:

1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.

Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.

很好!我们利用 Milvus 和 Fireworks AI 成功构建了一个 RAG 管道。

翻译自DeepLogo

想要更快、更简单、更好用的 Milvus SaaS服务 ?

Zilliz Cloud是基于Milvus的全托管向量数据库,拥有更高性能,更易扩展,以及卓越性价比

免费试用 Zilliz Cloud
反馈

此页对您是否有帮助?