MilvusとUnstructuredでRAGを構築する
Unstructuredは、RAG(Retrieval Augmented Generation)やモデルの微調整のために非構造化ドキュメントを取り込み、処理するためのプラットフォームとツールを提供します。ノーコードUIプラットフォームとサーバーレスAPIサービスの両方を提供し、ユーザーはUnstructuredがホストする計算リソース上でデータを処理できる。
このチュートリアルでは、Unstructuredを使用してPDFドキュメントをインジェストし、Milvusを使用してRAGパイプラインを構築します。
準備
依存関係と環境
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするために、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。
UNSTRUCTURED_API_KEY
とUNSTRUCTURED_URL
環境変数はここから取得できます。
この例ではLLMとしてOpenAIを使います。環境変数として、api key OPENAI_API_KEY
を用意してください。
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
MilvusクライアントとOpenAIクライアントの準備
Milvusクライアントを使用してMilvusコレクションを作成し、データを挿入することができます。
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
引数としてMilvusClient
を指定する:
./milvus.db
のようにuri
をローカルファイルとして設定するのが最も便利である。- 100万ベクトルを超えるような大規模なデータがある場合は、DockerやKubernetes上に、よりパフォーマンスの高いMilvusサーバを構築することができます。このセットアップでは、サーバのアドレスとポートをURIとして使用してください(例:
http://localhost:19530
)。Milvusの認証機能を有効にしている場合は、トークンに"<your_username>:<your_password>"を使用してください。 - MilvusのフルマネージドクラウドサービスであるZilliz Cloudを利用する場合は、Zilliz CloudのPublic EndpointとApi keyに対応する
uri
とtoken
を調整してください。
コレクションが既に存在するかどうかを確認し、存在する場合は削除します。
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
エンベッディングを生成し、レスポンスを生成する OpenAI クライアントを準備します。
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
テスト埋め込みを生成し、その次元と最初のいくつかの要素を表示する。
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Milvusコレクションの作成
以下のスキーマでコレクションを作成する:
id
各文書の一意な識別子である主キー。vector
文書の埋め込み。text
ドキュメントのテキストコンテンツ。metadata
文書のメタデータ。
そして、vector
フィールドにAUTOINDEX
インデックスを構築する。そしてコレクションを作成する。
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Unstructuredからデータをロードする
UnstructuredはPDF、HTMLなど様々なファイルタイプを処理する柔軟で強力なインジェストパイプラインを提供します。 インジェスト機能を使ってローカルディレクトリ内のPDFファイルをパーティショニングします。そして、Milvusにデータをロードします。
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Milvusにデータを挿入します。
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
レスポンスの取得と生成
Milvusから関連ドキュメントを取得する関数を定義します。
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
RAGパイプラインで取得したドキュメントを使用してレスポンスを生成する関数を定義します。
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
サンプル問題でRAGパイプラインをテストしてみましょう。
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.