LangChain-milvus統合における非同期関数

Open In Colab GitHub Repository

このチュートリアルでは、langchain-milvusの非同期関数を活用して高性能なアプリケーションを構築する方法について説明します。非同期メソッドを使うことで、特に大規模な検索を扱う場合に、アプリケーションのスループットと応答性を大幅に向上させることができます。リアルタイムのレコメンデーションシステムを構築する場合でも、アプリケーションにセマンティック検索を実装する場合でも、RAG(Retrieval-Augmented Generation)パイプラインを作成する場合でも、非同期操作は同時リクエストをより効率的に処理するのに役立ちます。高性能ベクトルデータベースmilvusとLangChainの強力なLLM抽象化を組み合わせることで、スケーラブルなAIアプリケーションを構築するための強固な基盤を提供することができます。

非同期APIの概要

langchain-milvusは包括的な非同期処理のサポートを提供し、大規模な並行処理シナリオにおけるパフォーマンスを大幅に向上させます。非同期APIは同期APIと一貫したインターフェース設計を維持します。

コア非同期関数

langchain-milvusで非同期操作を使用するには、メソッド名にa プレフィックスを追加するだけです。これにより、リソースの有効活用が可能になり、同時検索リクエストを処理する際のスループットが向上します。

操作タイプ同期メソッド非同期メソッド説明
テキストの追加add_texts()aadd_texts()ベクトルストアにテキストを追加
ドキュメントの追加add_documents()aadd_documents()ベクトルストアにドキュメントを追加する
埋め込みベクターの追加add_embeddings()aadd_embeddings()埋め込みベクトルを追加
類似検索similarity_search()asimilarity_search()テキストによる意味検索
ベクトル検索similarity_search_by_vector()asimilarity_search_by_vector()ベクトルによる意味検索
スコアによる検索similarity_search_with_score()asimilarity_search_with_score()テキストによる意味検索と類似度スコアを返す
スコア付きベクトル検索similarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()ベクトルによる意味検索と類似スコアを返す
多様性検索max_marginal_relevance_search()amax_marginal_relevance_search()MMR検索(多様性を最適化しつつ類似したものを返す)
ベクトル多様性検索max_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()ベクトルによるMMR検索
削除操作delete()adelete()文書の削除
Upsert操作upsert()aupsert()文書をアップサート(既存の場合は更新、そうでない場合は挿入)する。
メタデータ検索search_by_metadata()asearch_by_metadata()メタデータフィルタリングによるクエリー
主キーの取得get_pks()aget_pks()式による主キーの取得
テキストからの作成from_texts()afrom_texts()テキストからベクトルストアを作成

これらの関数の詳細については、APIリファレンスを参照してください。

パフォーマンスの利点

非同期操作により、大量の同時リクエストを処理する際のパフォーマンスが大幅に向上します:

  • ドキュメントのバッチ処理
  • 高同時検索シナリオ
  • プロダクションRAGアプリケーション
  • 大規模データのインポート/エクスポート

このチュートリアルでは、同期操作と非同期操作の詳細な比較を通じて、これらのパフォーマンス上の利点を実証し、非同期 API を活用してアプリケーションのパフォーマンスを最適化する方法を紹介します。

始める前に

このページのコード・スニペットには、以下の依存関係が必要です:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするために、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。

OpenAIのモデルを使います。api key OPENAI_API_KEY を環境変数として用意してください:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Jupyter Notebookを使用している場合、非同期コードを実行する前にこのコードを実行する必要があります:

import nest_asyncio

nest_asyncio.apply()

非同期APIの探索とパフォーマンス比較

それでは、langchain-milvusを使った同期処理と非同期処理のパフォーマンス比較について深く掘り下げてみよう。

まず、必要なライブラリをインポートします:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

テスト関数のセットアップ

テストデータを生成するヘルパー関数を作りましょう:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

ベクターストアの初期化

パフォーマンステストを実行する前に、Milvusベクターストアを初期化する必要があります。この関数により、テストごとに新しいコレクションから開始し、以前のデータからの干渉を排除します:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

非同期と同期ドキュメントの追加

それでは、同期と非同期のドキュメント追加のパフォーマンスを比較してみましょう。これらの関数は、ベクトルストアに複数のドキュメントを追加するときに、非同期操作がどれだけ速くなるかを測定するのに役立ちます。非同期バージョンはドキュメント追加ごとにタスクを作成して同時に実行し、同期バージョンはドキュメントを1つずつ処理します:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

では、ドキュメント数を変えてパフォーマンステストを実行し、実際のパフォーマンスの違いを見てみましょう。負荷を変化させてテストし、非同期操作が同期操作に比べてどのようにスケールするかを理解します。このテストでは、両方のアプローチの実行時間を測定し、非同期操作のパフォーマンス上の利点を実証します:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

検索性能の比較では、まずベクターストアにデータを入れる必要がある。以下の関数は、複数の検索クエリーを同時に作成し、同期と非同期の実行時間を比較することで、検索パフォーマンスを測定するのに役立ちます:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

では、包括的な検索パフォーマンス・テストを実行して、非同期操作が同期操作に比べてどのようにスケールするかを見てみよう。異なるクエリーボリュームでテストを行い、特に同時処理の数が増えるほど、非同期処理のパフォーマンス上の利点が高まることを実証します:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

非同期と同期の比較削除

削除操作もまた、非同期操作によってパフォーマンスが大幅に向上する重要な要素です。同期削除操作と非同期削除操作のパフォーマンスの違いを測定する関数を作ってみましょう。これらのテストは、非同期オペレーションがバッチ削除をより効率的に処理できることを示すのに役立ちます:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

では、削除パフォーマンステストを実行して、パフォーマンスの違いを定量化してみましょう。まず、新しいベクターストアにテストデータを入れ、同期と非同期の両方のアプローチで削除操作を実行します:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

まとめ

このチュートリアルでは、LangChainとmilvusを使った非同期操作のパフォーマンス上の優位性を示しました。追加、検索、削除操作の同期版と非同期版を比較し、特に大規模なバッチ操作において、非同期操作がいかに大幅な速度向上をもたらすかを示しました。

主な要点

  1. 非同期オペレーションは、並列実行可能な多数の個別オペレーションを実行する場合に、最大のメリットをもたらす。
  2. より高いスループットを生成するワークロードでは、同期操作と非同期操作の性能差は拡大する。
  3. 非同期オペレーションは、マシンの計算能力をフルに活用します。

LangChainとmilvusでプロダクションRAGアプリケーションを構築する場合、パフォーマンスが気になる場合、特に並列処理では非同期APIの使用を検討してください。

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
フィードバック

このページは役に立ちましたか ?