🚀 جرب Zilliz Cloud، الـ Milvus المدارة بالكامل، مجاناً — تجربة أداء أسرع بـ 10 أضعاف! جرب الآن>>

milvus-logo
LFAI
الصفحة الرئيسية
  • البرامج التعليمية
  • Home
  • Docs
  • البرامج التعليمية

  • استخدام AsyncMilvusClient مع asyncio

برنامج تعليمي: استخدام AsyncMilvusClient مع asyncio

AsyncMilvusClient هو عميل MilvusClient غير متزامن يوفر واجهة برمجة تطبيقات قائمة على كوروتين للوصول غير المتوقف إلى Milvus عبر asyncio. ستتعرف في هذه المقالة على عملية استدعاء واجهات برمجة التطبيقات التي يوفرها AsyncMilvusClient والجوانب التي تحتاج إلى الانتباه إليها.

نظرة عامة

Asyncio هي مكتبة لكتابة الشيفرة البرمجية المتزامنة باستخدام صيغة مزامنة/انتظار وتعمل كأساس لعميل Milvus غير المتزامن عالي الأداء، والذي سيتناسب مع مكتبة الشيفرة الخاصة بك التي تعمل على رأس Asyncio.

تحتوي الأساليب التي يوفرها AsyncMilvusClient على مجموعات معلمات وسلوكيات مماثلة لتلك الخاصة بـ MilvusClient. يكمن الاختلاف الوحيد في طريقة استدعائها. يسرد الجدول التالي الطرق المتاحة في AsyncMilvusClient.

العميل

close()

التجميع والتقسيم

create_collection()

drop_collection()

create_partition()

drop_partition()

الفهرس

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

المتجه

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

إذا كنت لا تزال بحاجة إلى الإصدار غير المتزامن من أي أسلوب MilvusClient آخر، يمكنك إرسال طلب ميزة في مستودع pymilvus repo. نرحب أيضًا بالمساهمة بالرموز.

إنشاء حلقة حدث

عادةً ما تستخدم التطبيقات التي تستخدم asyncio حلقة الحدث كمنظم لإدارة المهام غير المتزامنة وعمليات الإدخال/الإخراج. في هذا البرنامج التعليمي، سنحصل في هذا البرنامج التعليمي على حلقة حدث من asyncio ونستخدمها كمنظِّم.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

الاتصال باستخدام AsyncMilvusClient

يوضح المثال التالي كيفية توصيل ميلفوس بطريقة غير متزامنة.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

إنشاء مخطط

حاليًا، create_schema() غير متوفر في AsyncMilvusClient. بدلاً من ذلك، سنستخدم MilvusClient لإنشاء مخطط للمجموعة.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

يقوم AsyncMilvusClient باستدعاء الأسلوب create_schema() بشكل متزامن؛ وبالتالي، لا تحتاج إلى تنظيم الاستدعاء باستخدام حلقة الحدث.

إنشاء مجموعة

سنستخدم الآن المخطط لإنشاء مجموعة. لاحظ أنك تحتاج إلى وضع بادئة للكلمة الأساسية await لأي استدعاء إلى طرق AsyncMilvusClient ووضع الاستدعاء داخل دالة async على النحو التالي.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

إنشاء فهرس

تحتاج أيضًا إلى إنشاء فهارس لجميع الحقول المتجهة والحقول القياسية الاختيارية. وفقًا للمخطط المحدد أعلاه، هناك حقلان متجهان في المجموعة، وستقوم بإنشاء فهارس لهما على النحو التالي.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

تحميل مجموعة

يمكن تحميل المجموعة بعد فهرسة الحقول الضرورية. توضح الشيفرة التالية كيفية تحميل المجموعة بشكل غير متزامن.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

إدراج البيانات

يمكنك استخدام نماذج التضمين المتوفرة في pymilvus لإنشاء تضمينات متجهة لنصوصك. للحصول على التفاصيل، راجع نظرة عامة على التضمين. في هذا القسم، سنقوم بإدراج البيانات التي تم إنشاؤها عشوائيًا في المجموعة.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

الاستعلام

بعد تحميل المجموعة وتعبئتها بالبيانات، يمكنك إجراء عمليات البحث والاستعلام فيها. في هذا القسم، ستعثر في هذا القسم على عدد الكيانات في الحقل text بدءًا من الكلمة random في المجموعة المسماة my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

في هذا القسم، ستقوم بإجراء عمليات بحث متجهية على حقول المتجهات الكثيفة والمتناثرة في المجموعة المستهدفة.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

يجب أن تسرد مخرجات البحث ثلاث مجموعات من النتائج المطابقة لمتجهات الاستعلام المحددة.

يجمع البحث المختلط بين نتائج عمليات البحث المتعددة ويعيد ترتيبها للحصول على استدعاء أفضل. في هذا القسم، ستقوم بإجراء بحث هجين باستخدام حقول المتجهات الكثيفة والمتناثرة.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

جرب Managed Milvus مجاناً

Zilliz Cloud خالي من المتاعب، ويعمل بواسطة Milvus ويعمل بسرعة 10 أضعاف.

ابدأ
التعليقات

هل كانت هذه الصفحة مفيدة؟