LangChain-milvus統合における非同期関数
このチュートリアルでは、langchain-milvusの非同期関数を活用して高性能なアプリケーションを構築する方法について説明します。非同期メソッドを使うことで、特に大規模な検索を扱う場合に、アプリケーションのスループットと応答性を大幅に向上させることができます。リアルタイムのレコメンデーションシステムを構築する場合でも、アプリケーションにセマンティック検索を実装する場合でも、RAG(Retrieval-Augmented Generation)パイプラインを作成する場合でも、非同期操作は同時リクエストをより効率的に処理するのに役立ちます。高性能ベクトルデータベースmilvusとLangChainの強力なLLM抽象化を組み合わせることで、スケーラブルなAIアプリケーションを構築するための強固な基盤を提供することができます。
非同期APIの概要
langchain-milvusは包括的な非同期処理のサポートを提供し、大規模な並行処理シナリオにおけるパフォーマンスを大幅に向上させます。非同期APIは同期APIと一貫したインターフェース設計を維持します。
コア非同期関数
langchain-milvusで非同期操作を使用するには、メソッド名にa プレフィックスを追加するだけです。これにより、リソースの有効活用が可能になり、同時検索リクエストを処理する際のスループットが向上します。
| 操作タイプ | 同期メソッド | 非同期メソッド | 説明 |
|---|---|---|---|
| テキストの追加 | add_texts() | aadd_texts() | ベクトルストアにテキストを追加 |
| ドキュメントの追加 | add_documents() | aadd_documents() | ベクトルストアにドキュメントを追加する |
| 埋め込みベクターの追加 | add_embeddings() | aadd_embeddings() | 埋め込みベクトルを追加 |
| 類似検索 | similarity_search() | asimilarity_search() | テキストによる意味検索 |
| ベクトル検索 | similarity_search_by_vector() | asimilarity_search_by_vector() | ベクトルによる意味検索 |
| スコアによる検索 | similarity_search_with_score() | asimilarity_search_with_score() | テキストによる意味検索と類似度スコアを返す |
| スコア付きベクトル検索 | similarity_search_with_score_by_vector() | asimilarity_search_with_score_by_vector() | ベクトルによる意味検索と類似スコアを返す |
| 多様性検索 | max_marginal_relevance_search() | amax_marginal_relevance_search() | MMR検索(多様性を最適化しつつ類似したものを返す) |
| ベクトル多様性検索 | max_marginal_relevance_search_by_vector() | amax_marginal_relevance_search_by_vector() | ベクトルによるMMR検索 |
| 削除操作 | delete() | adelete() | 文書の削除 |
| Upsert操作 | upsert() | aupsert() | 文書をアップサート(既存の場合は更新、そうでない場合は挿入)する。 |
| メタデータ検索 | search_by_metadata() | asearch_by_metadata() | メタデータフィルタリングによるクエリー |
| 主キーの取得 | get_pks() | aget_pks() | 式による主キーの取得 |
| テキストからの作成 | from_texts() | afrom_texts() | テキストからベクトルストアを作成 |
これらの関数の詳細については、APIリファレンスを参照してください。
パフォーマンスの利点
非同期操作により、大量の同時リクエストを処理する際のパフォーマンスが大幅に向上します:
- ドキュメントのバッチ処理
- 高同時検索シナリオ
- プロダクションRAGアプリケーション
- 大規模データのインポート/エクスポート
このチュートリアルでは、同期操作と非同期操作の詳細な比較を通じて、これらのパフォーマンス上の利点を実証し、非同期 API を活用してアプリケーションのパフォーマンスを最適化する方法を紹介します。
始める前に
このページのコード・スニペットには、以下の依存関係が必要です:
! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio
Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするために、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。
OpenAIのモデルを使います。api key OPENAI_API_KEY を環境変数として用意してください:
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Jupyter Notebookを使用している場合、非同期コードを実行する前にこのコードを実行する必要があります:
import nest_asyncio
nest_asyncio.apply()
非同期APIの探索とパフォーマンス比較
それでは、langchain-milvusを使った同期処理と非同期処理のパフォーマンス比較について深く掘り下げてみよう。
まず、必要なライブラリをインポートします:
import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus
# Define the Milvus URI
URI = "http://localhost:19530"
テスト関数のセットアップ
テストデータを生成するヘルパー関数を作りましょう:
def random_id():
"""Generate a random string ID"""
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def generate_test_documents(num_docs):
"""Generate test documents for performance testing"""
docs = []
for i in range(num_docs):
content = (
f"This is test document {i} with some random content: {random.random()}"
)
metadata = {
"id": f"doc_{i}",
"score": random.random(),
"category": f"cat_{i % 5}",
}
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
ベクターストアの初期化
パフォーマンステストを実行する前に、Milvusベクターストアを初期化する必要があります。この関数により、テストごとに新しいコレクションから開始し、以前のデータからの干渉を排除します:
def init_vector_store():
"""Initialize and return a fresh vector store for testing"""
return Milvus(
embedding_function=OpenAIEmbeddings(),
collection_name="langchain_perf_test",
connection_args={"uri": URI},
auto_id=True,
drop_old=True, # Always start with a fresh collection
)
非同期と同期ドキュメントの追加
それでは、同期と非同期のドキュメント追加のパフォーマンスを比較してみましょう。これらの関数は、ベクトルストアに複数のドキュメントを追加するときに、非同期操作がどれだけ速くなるかを測定するのに役立ちます。非同期バージョンはドキュメント追加ごとにタスクを作成して同時に実行し、同期バージョンはドキュメントを1つずつ処理します:
async def async_add(milvus_store, num_adding):
"""Add documents asynchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
tasks = []
for doc in docs:
# Create tasks for each document addition
task = milvus_store.aadd_documents([doc])
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_add(milvus_store, num_adding):
"""Add documents synchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
for doc in docs:
result = milvus_store.add_documents([doc])
end_time = time.time()
return end_time - start_time
では、ドキュメント数を変えてパフォーマンステストを実行し、実際のパフォーマンスの違いを見てみましょう。負荷を変化させてテストし、非同期操作が同期操作に比べてどのようにスケールするかを理解します。このテストでは、両方のアプローチの実行時間を測定し、非同期操作のパフォーマンス上の利点を実証します:
add_counts = [10, 100]
# Get the event loop
loop = asyncio.get_event_loop()
# Create a new vector store for testing
milvus_store = init_vector_store()
# Test async document addition
for count in add_counts:
async def measure_async_add():
async_time = await async_add(milvus_store, count)
print(f"Async add for {count} documents took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
# Reset vector store for sync tests
milvus_store = init_vector_store()
# Test sync document addition
for count in add_counts:
sync_time = sync_add(milvus_store, count)
print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)
Async add for 10 documents took 1.74 seconds
2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)
Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds
非同期 vs 同期検索
検索性能の比較では、まずベクターストアにデータを入れる必要がある。以下の関数は、複数の検索クエリーを同時に作成し、同期と非同期の実行時間を比較することで、検索パフォーマンスを測定するのに役立ちます:
def populate_vector_store(milvus_store, num_docs=1000):
"""Populate the vector store with test documents"""
docs = generate_test_documents(num_docs)
milvus_store.add_documents(docs)
return docs
async def async_search(milvus_store, num_queries):
"""Perform async searches and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_queries):
query = f"test document {i % 50}"
task = milvus_store.asimilarity_search(query=query, k=3)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_search(milvus_store, num_queries):
"""Perform sync searches and measure the time"""
start_time = time.time()
for i in range(num_queries):
query = f"test document {i % 50}"
result = milvus_store.similarity_search(query=query, k=3)
end_time = time.time()
return end_time - start_time
では、包括的な検索パフォーマンス・テストを実行して、非同期操作が同期操作に比べてどのようにスケールするかを見てみよう。異なるクエリーボリュームでテストを行い、特に同時処理の数が増えるほど、非同期処理のパフォーマンス上の利点が高まることを実証します:
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
query_counts = [10, 100]
# Test async search
for count in query_counts:
async def measure_async_search():
async_time = await async_search(milvus_store, count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
# Test sync search
for count in query_counts:
sync_time = sync_search(milvus_store, count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)
Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds
非同期と同期の比較削除
削除操作もまた、非同期操作によってパフォーマンスが大幅に向上する重要な要素です。同期削除操作と非同期削除操作のパフォーマンスの違いを測定する関数を作ってみましょう。これらのテストは、非同期オペレーションがバッチ削除をより効率的に処理できることを示すのに役立ちます:
async def async_delete(milvus_store, num_deleting):
"""Delete documents asynchronously and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
task = milvus_store.adelete(expr=expr)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_delete(milvus_store, num_deleting):
"""Delete documents synchronously and measure the time"""
start_time = time.time()
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
result = milvus_store.delete(expr=expr)
end_time = time.time()
return end_time - start_time
では、削除パフォーマンステストを実行して、パフォーマンスの違いを定量化してみましょう。まず、新しいベクターストアにテストデータを入れ、同期と非同期の両方のアプローチで削除操作を実行します:
delete_counts = [10, 100]
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test async delete
for count in delete_counts:
async def measure_async_delete():
async_time = await async_delete(milvus_store, count)
print(f"Async delete for {count} operations took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_delete())
# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test sync delete
for count in delete_counts:
sync_time = sync_delete(milvus_store, count)
print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)
Async delete for 10 operations took 0.58 seconds
2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)
Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds
まとめ
このチュートリアルでは、LangChainとmilvusを使った非同期操作のパフォーマンス上の優位性を示しました。追加、検索、削除操作の同期版と非同期版を比較し、特に大規模なバッチ操作において、非同期操作がいかに大幅な速度向上をもたらすかを示しました。
主な要点
- 非同期オペレーションは、並列実行可能な多数の個別オペレーションを実行する場合に、最大のメリットをもたらす。
- より高いスループットを生成するワークロードでは、同期操作と非同期操作の性能差は拡大する。
- 非同期オペレーションは、マシンの計算能力をフルに活用します。
LangChainとmilvusでプロダクションRAGアプリケーションを構築する場合、パフォーマンスが気になる場合、特に並列処理では非同期APIの使用を検討してください。