🚀 免費嘗試 Zilliz Cloud,完全托管的 Milvus,體驗速度提升 10 倍!立即嘗試

milvus-logo
LFAI
主頁
  • 教學

教學:在 asyncio 中使用 AsyncMilvusClient

AsyncMilvusClient是一個非同步的 MilvusClient,它提供了一個基於動態程式的 API,可透過asyncio 以非阻塞方式存取 Milvus。在這篇文章中,您將學習如何呼叫 AsyncMilvusClient 所提供的 API,以及需要注意的地方。

概述

Asyncio 是一個使用async/await語法撰寫並發程式碼的函式庫,也是 Milvus 高效能異步用戶端的基礎,它將適合您在 asyncio 上執行的程式碼庫。

AsyncMilvusClient 提供的方法具有與 MilvusClient 相同的參數集與行為。唯一的差別在於您呼叫它們的方式。下表列出 AsyncMilvusClient 可用的方法。

用戶端

close()

集合與分割

create_collection()

drop_collection()

create_partition()

drop_partition()

索引

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

向量

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

如果您仍然需要任何其他 MilvusClient 方法的異步版本,您可以在pymilvus套件中提交功能請求。我們也歡迎您提供程式碼。

建立事件循環

使用 asyncio 的應用程式通常會使用事件循環作為管理異步任務和 I/O 作業的協調器。在本教程中,我們將從 asyncio 獲得一個事件迴圈,並將其用作協調器。

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

使用 AsyncMilvusClient 連線

以下範例示範如何以異步方式連接 Milvus。

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

建立模式

目前,AsyncMilvusClient 無法提供create_schema() 。取而代之,我們將使用 MilvusClient 來建立集合的模式。

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient 會同步呼叫create_schema() 方法;因此,您不需要使用事件循環來協調呼叫。

建立集合

現在我們將使用模式建立一個集合。請注意,您需要在任何呼叫AsyncMilvusClient 方法前加上await 關鍵字,並將呼叫置於async 函式內,如下所示。

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

建立索引

您也需要為所有向量欄位和可選的標量欄位建立索引。根據上面定義的模式,集合中有兩個向量欄位,您將為它們建立索引,如下所示。

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

載入集合

在為必要的欄位建立索引後,就可以載入集合。以下程式碼示範如何以異步方式載入集合。

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

插入資料

您可以使用 pymilvus 中可用的嵌入模型來為您的文字產生向量嵌入。詳情請參閱嵌入概述。在本節中,我們將插入隨機產生的資料到資料集中。

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

查詢

當資料集載入並填滿資料後,您就可以在其中進行搜尋和查詢。在本節中,您將找出text 欄位中以random 開頭的實體數目,該實體的集合名為my_collection

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

在本節中,您將對目標集合的密集與稀疏向量欄位進行向量搜尋。

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

搜尋輸出應列出與指定查詢向量對應的三組結果。

混合搜尋會結合多次搜尋的結果,並重新排序,以獲得更好的召回率。在本節中,您將使用密集與稀疏向量場進行混合搜尋。

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

免費嘗試托管的 Milvus

Zilliz Cloud 無縫接入,由 Milvus 提供動力,速度提升 10 倍。

開始使用
反饋

這個頁面有幫助嗎?