Open In Colab GitHub Repository

MilvusとUnstructuredでRAGを構築する

Unstructuredは、RAG(Retrieval Augmented Generation)やモデルの微調整のために非構造化ドキュメントを取り込んで処理するためのプラットフォームとツールを提供します。ノーコードUIプラットフォームとサーバーレスAPIサービスの両方を提供し、ユーザーはUnstructuredがホストする計算リソース上でデータを処理できる。

このチュートリアルでは、Unstructuredを使用してPDFドキュメントをインジェストし、Milvusを使用してRAGパイプラインを構築します。

準備

依存関係と環境

$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai

インストールオプション:

  • すべてのドキュメントフォーマットを処理する場合:pip install "unstructured[all-docs]"
  • 特定のフォーマット(例:PDF)の場合:pip install "unstructured[pdf]"
  • その他のインストールオプションについては、Unstructuredのドキュメントをご覧ください。

Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするには、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。

この例では、LLMとしてOpenAIを使います。環境変数として、api key OPENAI_API_KEY を用意してください。

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

MilvusクライアントとOpenAIクライアントの準備

Milvusクライアントを使用してMilvusコレクションを作成し、そこにデータを挿入することができます。

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")

引数としてMilvusClient を指定する:

  • ./milvus.db のようにuri をローカルファイルとして設定するのが最も便利である。
  • 100万ベクトルを超えるような大規模なデータをお持ちの場合は、DockerやKubernetes上に、よりパフォーマンスの高いMilvusサーバを構築することができます。このセットアップでは、サーバのアドレスとポートをURIとして使用してください(例:http://localhost:19530 )。Milvusで認証機能を有効にしている場合は、トークンに ":" を使用します。そうでない場合は、トークンを設定しないでください。
  • MilvusのフルマネージドクラウドサービスであるZilliz Cloudを利用する場合は、uritoken をZilliz CloudのPublic EndpointとApi keyに対応させてください。

コレクションが既に存在するかどうかを確認し、存在する場合は削除します。

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

エンベッディングを生成し、レスポンスを生成する OpenAI クライアントを準備します。

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

テスト埋め込みを生成し、その次元と最初のいくつかの要素を表示する。

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Milvusコレクションの作成

以下のスキーマでコレクションを作成する:

  • id各文書の一意な識別子である主キー。
  • vector文書の埋め込み。
  • textドキュメントのテキストコンテンツ。
  • metadata文書のメタデータ。

そして、vector フィールドにAUTOINDEX インデックスを構築する。そしてコレクションを作成する。

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Bounded",
)

milvus_client.load_collection(collection_name=collection_name)

Unstructuredからデータをロードする

Unstructuredは、PDFやHTMLなど、様々なファイルタイプを処理するための柔軟で強力な取り込みパイプラインを提供する。 ローカルのPDFファイルを分割し、チャンクする。そしてMilvusにデータをロードします。

import warnings
from unstructured.partition.auto import partition

warnings.filterwarnings("ignore")

elements = partition(
    filename="./pdf_files/WhatisMilvus.pdf",
    strategy="hi_res",
    chunking_strategy="by_title",
)  # Replace with the path to your PDF file

PDFファイルからパーティショニングされた要素を調べてみましょう。各要素は、Unstructuredのパーティショニングプロセスによって抽出されたコンテンツのチャンクを表しています。

for element in elements:
    print(element)
    break
What is Milvus?

Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.

Milvusにデータを挿入する。

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}

レスポンスの取得と生成

Milvusから関連文書を取得する関数を定義します。

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

RAGパイプラインで取得したドキュメントを使用してレスポンスを生成する関数を定義する。

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

サンプル問題でRAGパイプラインをテストしてみましょう。

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.