🚀 Coba Zilliz Cloud, Milvus yang sepenuhnya terkelola, secara gratis—rasakan performa 10x lebih cepat! Coba Sekarang>>

milvus-logo
LFAI
Beranda
  • Tutorial
  • Home
  • Docs
  • Tutorial

  • Gunakan AsyncMilvusClient dengan asyncio

Tutorial: Menggunakan AsyncMilvusClient dengan asyncio

AsyncMilvusClient adalah MilvusClient asinkron yang menawarkan API berbasis coroutine untuk akses tanpa pemblokiran ke Milvus melalui asyncio. Pada artikel ini, Anda akan mempelajari tentang proses pemanggilan API yang disediakan oleh AsyncMilvusClient dan aspek-aspek yang perlu Anda perhatikan.

Gambaran Umum

Asyncio adalah pustaka untuk menulis kode konkuren menggunakan sintaks async/await dan berfungsi sebagai fondasi untuk klien asinkron berkinerja tinggi dari Milvus, yang akan sesuai dengan pustaka kode Anda yang berjalan di atas asyncio.

Metode-metode yang disediakan oleh AsyncMilvusClient memiliki set parameter dan perilaku yang sama dengan yang dimiliki oleh MilvusClient. Satu-satunya perbedaan terletak pada cara Anda memanggilnya. Tabel berikut mencantumkan metode yang tersedia di AsyncMilvusClient.

Klien

close()

Koleksi & Partisi

create_collection()

drop_collection()

create_partition()

drop_partition()

Indeks

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vektor

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

Jika Anda masih membutuhkan versi asinkron dari metode MilvusClient lainnya, Anda dapat mengirimkan permintaan fitur di repositori pymilvus. Kontribusi kode juga diterima.

Membuat perulangan kejadian

Aplikasi yang menggunakan asinkronisasi biasanya menggunakan event loop sebagai pengatur untuk mengelola tugas-tugas asinkron dan operasi I/O. Dalam tutorial ini, kita akan mendapatkan event loop dari asyncio dan menggunakannya sebagai orkestrator.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Terhubung dengan AsyncMilvusClient

Contoh berikut ini mendemonstrasikan cara menghubungkan Milvus secara asinkron.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Membuat skema

Saat ini, create_schema() tidak tersedia di AsyncMilvusClient. Sebagai gantinya, kita akan menggunakan MilvusClient untuk membuat skema untuk koleksi.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient memanggil metode create_schema() secara sinkron; oleh karena itu, Anda tidak perlu mengatur pemanggilan menggunakan perulangan peristiwa.

Membuat koleksi

Sekarang kita akan menggunakan skema untuk membuat koleksi. Perhatikan bahwa Anda perlu mengawali kata kunci await pada setiap pemanggilan ke metode AsyncMilvusClient dan menempatkan pemanggilan tersebut di dalam fungsi async sebagai berikut.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Membuat indeks

Anda juga perlu membuat indeks untuk semua bidang vektor dan bidang skalar opsional. Menurut skema yang didefinisikan di atas, ada dua bidang vektor dalam koleksi, dan Anda akan membuat indeks untuk mereka sebagai berikut.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Memuat koleksi

Koleksi dapat dimuat setelah bidang yang diperlukan diindeks. Kode berikut ini menunjukkan cara memuat koleksi secara asinkron.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Menyisipkan data

Anda dapat menggunakan model penyematan yang tersedia di pymilvus untuk menghasilkan penyematan vektor untuk teks Anda. Untuk detailnya, lihat Ikhtisar Penyematan. Pada bagian ini, kita akan menyisipkan data yang dibuat secara acak ke dalam koleksi.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Kueri

Setelah koleksi dimuat dan diisi dengan data, Anda dapat melakukan pencarian dan kueri di dalamnya. Pada bagian ini, Anda akan menemukan jumlah entitas di bidang text yang dimulai dengan kata random dalam koleksi bernama my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

Pada bagian ini, Anda akan melakukan pencarian vektor pada bidang vektor padat dan bidang vektor jarang dari koleksi target.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

Hasil pencarian harus mencantumkan tiga set hasil yang sesuai dengan vektor kueri yang ditentukan.

Pencarian hibrida menggabungkan hasil dari beberapa pencarian dan memberi peringkat ulang untuk mendapatkan pemanggilan yang lebih baik. Pada bagian ini, Anda akan melakukan pencarian hibrida menggunakan bidang vektor padat dan jarang.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Coba Milvus yang Dikelola secara Gratis

Zilliz Cloud bebas masalah, didukung oleh Milvus dan 10x lebih cepat.

Mulai
Umpan balik

Apakah halaman ini bermanfaat?