برنامج تعليمي: استخدام AsyncMilvusClient مع asyncio
AsyncMilvusClient هو عميل MilvusClient غير متزامن يوفر واجهة برمجة تطبيقات قائمة على كوروتين للوصول غير المتوقف إلى Milvus عبر asyncio. ستتعرف في هذه المقالة على عملية استدعاء واجهات برمجة التطبيقات التي يوفرها AsyncMilvusClient والجوانب التي تحتاج إلى الانتباه إليها.
نظرة عامة
Asyncio هي مكتبة لكتابة الشيفرة البرمجية المتزامنة باستخدام صيغة مزامنة/انتظار وتعمل كأساس لعميل Milvus غير المتزامن عالي الأداء، والذي سيتناسب مع مكتبة الشيفرة الخاصة بك التي تعمل على رأس Asyncio.
تحتوي الأساليب التي يوفرها AsyncMilvusClient على مجموعات معلمات وسلوكيات مماثلة لتلك الخاصة بـ MilvusClient. يكمن الاختلاف الوحيد في طريقة استدعائها. يسرد الجدول التالي الطرق المتاحة في AsyncMilvusClient.
العميل | ||
---|---|---|
| ||
التجميع والتقسيم | ||
|
|
|
| ||
الفهرس | ||
|
|
|
|
|
|
المتجه | ||
|
|
|
|
|
|
|
إذا كنت لا تزال بحاجة إلى الإصدار غير المتزامن من أي أسلوب MilvusClient آخر، يمكنك إرسال طلب ميزة في مستودع pymilvus repo. نرحب أيضًا بالمساهمة بالرموز.
إنشاء حلقة حدث
عادةً ما تستخدم التطبيقات التي تستخدم asyncio حلقة الحدث كمنظم لإدارة المهام غير المتزامنة وعمليات الإدخال/الإخراج. في هذا البرنامج التعليمي، سنحصل في هذا البرنامج التعليمي على حلقة حدث من asyncio ونستخدمها كمنظِّم.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
الاتصال باستخدام AsyncMilvusClient
يوضح المثال التالي كيفية توصيل ميلفوس بطريقة غير متزامنة.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
إنشاء مخطط
حاليًا، create_schema()
غير متوفر في AsyncMilvusClient. بدلاً من ذلك، سنستخدم MilvusClient لإنشاء مخطط للمجموعة.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
يقوم AsyncMilvusClient باستدعاء الأسلوب create_schema()
بشكل متزامن؛ وبالتالي، لا تحتاج إلى تنظيم الاستدعاء باستخدام حلقة الحدث.
إنشاء مجموعة
سنستخدم الآن المخطط لإنشاء مجموعة. لاحظ أنك تحتاج إلى وضع بادئة للكلمة الأساسية await
لأي استدعاء إلى طرق AsyncMilvusClient
ووضع الاستدعاء داخل دالة async
على النحو التالي.
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
إنشاء فهرس
تحتاج أيضًا إلى إنشاء فهارس لجميع الحقول المتجهة والحقول القياسية الاختيارية. وفقًا للمخطط المحدد أعلاه، هناك حقلان متجهان في المجموعة، وستقوم بإنشاء فهارس لهما على النحو التالي.
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
تحميل مجموعة
يمكن تحميل المجموعة بعد فهرسة الحقول الضرورية. توضح الشيفرة التالية كيفية تحميل المجموعة بشكل غير متزامن.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
إدراج البيانات
يمكنك استخدام نماذج التضمين المتوفرة في pymilvus لإنشاء تضمينات متجهة لنصوصك. للحصول على التفاصيل، راجع نظرة عامة على التضمين. في هذا القسم، سنقوم بإدراج البيانات التي تم إنشاؤها عشوائيًا في المجموعة.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
الاستعلام
بعد تحميل المجموعة وتعبئتها بالبيانات، يمكنك إجراء عمليات البحث والاستعلام فيها. في هذا القسم، ستعثر في هذا القسم على عدد الكيانات في الحقل text
بدءًا من الكلمة random
في المجموعة المسماة my_collection
.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
البحث
في هذا القسم، ستقوم بإجراء عمليات بحث متجهية على حقول المتجهات الكثيفة والمتناثرة في المجموعة المستهدفة.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
يجب أن تسرد مخرجات البحث ثلاث مجموعات من النتائج المطابقة لمتجهات الاستعلام المحددة.
البحث الهجين
يجمع البحث المختلط بين نتائج عمليات البحث المتعددة ويعيد ترتيبها للحصول على استدعاء أفضل. في هذا القسم، ستقوم بإجراء بحث هجين باستخدام حقول المتجهات الكثيفة والمتناثرة.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))