MilvusとLlamaIndexの非同期APIを使ったRAG

Open In Colab GitHub Repository

このチュートリアルでは、LlamaIndexと Milvusを使ってRAGの非同期ドキュメント処理パイプラインを構築する方法を説明します。LlamaIndexはMilvusのようにドキュメントを処理してベクトルDBに保存する方法を提供します。LlamaIndexの非同期APIとMilvusのPythonクライアントライブラリを活用することで、パイプラインのスループットを向上させ、大量のデータを効率的に処理し、インデックスを作成することができます。

このチュートリアルでは、まずLlamaIndexとMilvusを使ってRAGを構築するための非同期メソッドの使い方を高いレベルから紹介し、次に低レベルメソッドの使い方と同期と非同期の性能比較について紹介します。

始める前に

このページのコードスニペットにはpymilvusとllamaindexの依存関係が必要です。以下のコマンドでインストールできます:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

Google Colabを使用している場合、インストールした依存関係を有効にするために、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。

OpenAIのモデルを使います。環境変数として、api key OPENAI_API_KEY を用意してください。

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Jupyter Notebookを使用している場合は、非同期コードを実行する前にこのコードを実行する必要があります。

import nest_asyncio

nest_asyncio.apply()

データの準備

以下のコマンドでサンプルデータをダウンロードできる:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

非同期処理でRAGをビルドする

このセクションでは、非同期でドキュメントを処理できるRAGシステムを構築する方法を示します。

必要なライブラリをインポートし、Milvus URIとエンベッディングの次元を定義します。

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • 大規模なデータを扱う場合は、dockerやkubernetes上にMilvusサーバを構築します。このセットアップでは、サーバURI、例えばhttp://localhost:19530uri として使用してください。
  • MilvusのフルマネージドクラウドサービスであるZilliz Cloudを利用する場合は、Zilliz CloudのPublic EndpointとApi keyに対応するuritoken を調整してください。
  • 複雑なシステム(ネットワーク通信など)の場合、同期処理よりも非同期処理の方が性能向上が期待できます。そのため、Milvus-Liteは非同期インタフェースを使用するシナリオには適していないと考えます。

Milvusコレクションを再構築するための初期化関数を定義する。

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

ファイルpaul_graham_essay.txt からLlamaIndexドキュメントオブジェクトをラップするためにSimpleDirectoryReaderを使用する。

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

Hugging Face埋め込みモデルをローカルでインスタンス化する。ローカルモデルを使用することで、非同期データ挿入中にAPIレート制限に達するリスクを回避できます。同時APIリクエストはすぐに加算され、パブリックAPIの予算を使い果たしてしまうからです。しかし、レート制限が高い場合は、代わりにリモートモデルサービスを使用することもできます。

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

インデックスを作成し、ドキュメントを挿入する。

非同期挿入モードを有効にするため、use_asyncTrue に設定します。

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

LLMを初期化する。

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

クエリエンジンを構築する際に、use_async パラメータをTrue に設定して、非同期検索を有効にすることもできます。

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

非同期APIを調べる

このセクションでは、より低レベルのAPIの使い方を紹介し、同期実行と非同期実行のパフォーマンスを比較します。

非同期追加

ベクトルストアを再初期化する。

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

インデックス用に大量のテスト・ノードを生成するために使用する、ノード生成関数を定義しよう。

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

ベクトルストアにドキュメントを追加するaync関数を定義する。Milvusベクターストアのインスタンスでasync_add()

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

イベントループを取得する。

loop = asyncio.get_event_loop()

ベクトルストアに文書を非同期に追加する。

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

同期追加との比較

同期追加関数を定義する。そして、同じ条件で実行時間を計測する。

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

その結果、同期追加処理の方が非同期追加処理よりもはるかに遅いことがわかる。

検索を実行する前にベクトルストアを再初期化し、いくつかの文書を追加する。

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

非同期検索関数を定義する。ここではMilvusベクターストアインスタンスのaquery()

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

Milvusストアから非同期検索を実行する。

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

同期検索関数を定義する。そして、同じ条件で実行時間を計測する。

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

その結果、同期検索処理は非同期検索処理よりはるかに遅いことがわかる。

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
フィードバック

このページは役に立ちましたか ?