MilvusとFireworks AIでRAGを構築する
Fireworks AIは、生成AI推論プラットフォームであり、業界をリードするスピードと、モデルの実行とカスタマイズのためのプロダクションレディネスを提供します。 Fireworks AIは、サーバーレスモデル、オンデマンドデプロイメント、微調整機能を含む、様々な生成AIサービスを提供します。大規模言語モデル(LLM)やエンベッディングモデルなど、さまざまなAIモデルをデプロイするための包括的な環境を提供する。Fireworks AIは多数のモデルを集約しているため、ユーザーは大規模なインフラを構築することなく、これらのリソースに簡単にアクセスして利用することができます。
このチュートリアルでは、MilvusとFireworks AIを使ってRAG(Retrieval-Augmented Generation)パイプラインを構築する方法をご紹介します。
準備
依存関係と環境
$ pip install --upgrade pymilvus openai requests tqdm
Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするために、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択します)。
Fireworks AIは、OpenAIスタイルのAPIを有効にします。公式サイトにログインし、api key FIREWORKS_API_KEY
を環境変数として用意します。
import os
os.environ["FIREWORKS_API_KEY"] = "***********"
データの準備
Milvusドキュメント2.4.xのFAQページをRAGのプライベートナレッジとして使用する。
zipファイルをダウンロードし、milvus_docs
フォルダにドキュメントを展開する。
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
フォルダmilvus_docs/en/faq
からすべてのマークダウン・ファイルをロードする。各ドキュメントについて、単に "# "を使ってファイル内のコンテンツを区切るだけで、マークダウン・ファイルの各主要部分のコンテンツを大まかに区切ることができる。
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
LLMと埋め込みモデルの準備
クライアントを初期化して、LLMと埋め込みモデルを準備します。Fireworks AIでは、OpenAIスタイルのAPIが使えますので、同じAPIを微調整して使うことで、埋め込みモデルとLLMを呼び出すことができます。
from openai import OpenAI
fireworks_client = OpenAI(
api_key=os.environ["FIREWORKS_API_KEY"],
base_url="https://api.fireworks.ai/inference/v1",
)
クライアントを使ってテキスト埋め込みを生成する関数を定義します。例としてnomic-ai/nomic-embed-text-v1.5
モデルを使います。
def emb_text(text):
return (
fireworks_client.embeddings.create(
input=text, model="nomic-ai/nomic-embed-text-v1.5"
)
.data[0]
.embedding
)
テスト埋め込みを生成し、その次元と最初のいくつかの要素を表示します。
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]
Milvusにデータをロードする。
コレクションの作成
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
MilvusClient
の引数については、次のとおりです:
uri
をローカルファイル、例えば./milvus.db
とするのが最も便利な方法です。- データ規模が大きい場合は、dockerやkubernetes上に、よりパフォーマンスの高いMilvusサーバを構築することができます。このセットアップでは、サーバの uri、例えば
http://localhost:19530
をuri
として使用してください。 - MilvusのフルマネージドクラウドサービスであるZilliz Cloudを利用する場合は、Zilliz CloudのPublic EndpointとApi keyに対応する
uri
とtoken
を調整してください。
コレクションが既に存在するか確認し、存在する場合は削除します。
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
指定したパラメータで新しいコレクションを作成します。
フィールド情報を指定しない場合、Milvusは自動的にプライマリキー用のデフォルトid
フィールドと、ベクトルデータを格納するためのvector
フィールドを作成します。予約されたJSONフィールドは、スキーマで定義されていないフィールドとその値を格納するために使用されます。
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
データの挿入
テキスト行を繰り返し、エンベッディングを作成し、milvusにデータを挿入します。
ここに新しいフィールドtext
、コレクションスキーマで定義されていないフィールドです。これは予約されたJSONダイナミックフィールドに自動的に追加され、高レベルでは通常のフィールドとして扱うことができる。
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00, 2.51it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
RAGの構築
クエリのデータを取得する
Milvusに関するよくある質問を指定してみましょう。
question = "How is data stored in milvus?"
コレクションで質問を検索し、セマンティックトップ3マッチを取得します。
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
クエリの検索結果を見てみましょう。
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8334928750991821
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.746377170085907
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7328270673751831
]
]
LLMを使ってRAGレスポンスを取得する
検索されたドキュメントを文字列フォーマットに変換する。
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
ラネージ・モデルのシステム・プロンプトとユーザー・プロンプトを定義する。このプロンプトはmilvusから検索された文書で組み立てられる。
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Fireworksが提供するllama-v3p1-405b-instruct
モデルを使って、プロンプトに基づいたレスポンスを生成する。
response = fireworks_client.chat.completions.create(
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:
1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.
Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.
素晴らしい!MilvusとFireworks AIによるRAGパイプラインの構築に成功しました。