チュートリアルasyncioでAsyncMilvusClientを使う
AsyncMilvusClientは、asyncio経由でMilvusにノンブロッキングでアクセスするためのコルーチンベースのAPIを提供する非同期MilvusClientです。本記事では、AsyncMilvusClientが提供するAPIを呼び出すまでの流れや注意すべき点について解説します。
概要
Asyncioはasync/await構文を使用して並行コードを記述するためのライブラリであり、Milvusの高性能非同期クライアントの基盤として機能し、asyncio上で動作するコードライブラリに適合します。
AsyncMilvusClientが提供するメソッドはMilvusClientと同じパラメータセットと動作をします。唯一の違いは呼び出し方にあります。以下の表は AsyncMilvusClient で利用可能なメソッドの一覧です。
クライアント | ||
---|---|---|
| ||
コレクションとパーティション | ||
|
|
|
| ||
インデックス | ||
|
|
|
|
|
|
ベクター | ||
|
|
|
|
|
|
|
他のMilvusClientメソッドの非同期版が必要な場合は、pymilvusrepoに機能リクエストを提出してください。コードの投稿も歓迎します。
イベントループの作成
asyncioを使用するアプリケーションは通常、非同期タスクとI/Oオペレーションを管理するオーケストレータとしてイベントループを使用します。このチュートリアルでは、asyncioからイベントループを取得し、オーケストレーターとして使用します。
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
AsyncMilvusClientでの接続
以下の例では、Milvusを非同期で接続する方法を示します。
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
スキーマの作成
現在、AsyncMilvusClientではcreate_schema()
。代わりに、MilvusClientを使用してコレクションのスキーマを作成します。
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
AsyncMilvusClient はcreate_schema()
メソッドを同期的に呼び出します。したがって、イベントループを使用して呼び出しをオーケストレーションする必要はありません。
コレクションの作成
次に、スキーマを使用してコレクションを作成します。以下のように、AsyncMilvusClient
メソッドの呼び出しの前にawait
キーワードを付け、呼び出しをasync
関数内に配置する必要があることに注意してください。
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
インデックスの作成
また、すべてのベクトル・フィールドとオプションのスカラー・フィールドにインデックスを作成する必要がある。上で定義したスキーマによると、コレクションには2つのベクトル・フィールドがあります。
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
コレクションのロード
コレクションは、必要なフィールドのインデックスが作成された後にロードできます。以下のコードは、コレクションを非同期にロードする方法を示します。
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
データの挿入
pymilvusで利用可能な埋め込みモデルを使用して、テキストのベクトル埋め込みを生成できます。詳しくは埋め込み概要を参照してください。このセクションでは、ランダムに生成されたデータをコレクションに挿入します。
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
クエリ
コレクションがロードされ、データで満たされると、その中で検索やクエリを実行できます。このセクションでは、my_collection
という名前のコレクションで、random
という単語で始まるtext
フィールドのエンティティの数を検索します。
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
検索
このセクションでは、対象コレクションの密および疎ベクトルフィールドでベクトル検索を実行します。
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
検索出力は、指定されたクエリ・ベクトルに対応する3組の結果をリストする必要があります。
ハイブリッド検索
ハイブリッド検索は、複数の検索結果を組み合わせ、よりよい想起を得るために再ランクします。このセクションでは、密なベクトル場と疎なベクトル場を使ってハイブリッド検索を行います。
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))