Tutorial: Menggunakan AsyncMilvusClient dengan asyncio
AsyncMilvusClient adalah MilvusClient asinkron yang menawarkan API berbasis coroutine untuk akses tanpa pemblokiran ke Milvus melalui asyncio. Pada artikel ini, Anda akan mempelajari tentang proses pemanggilan API yang disediakan oleh AsyncMilvusClient dan aspek-aspek yang perlu Anda perhatikan.
Gambaran Umum
Asyncio adalah pustaka untuk menulis kode konkuren menggunakan sintaks async/await dan berfungsi sebagai fondasi untuk klien asinkron berkinerja tinggi dari Milvus, yang akan sesuai dengan pustaka kode Anda yang berjalan di atas asyncio.
Metode-metode yang disediakan oleh AsyncMilvusClient memiliki set parameter dan perilaku yang sama dengan yang dimiliki oleh MilvusClient. Satu-satunya perbedaan terletak pada cara Anda memanggilnya. Tabel berikut mencantumkan metode yang tersedia di AsyncMilvusClient.
Klien | ||
---|---|---|
| ||
Koleksi & Partisi | ||
|
|
|
| ||
Indeks | ||
|
|
|
|
|
|
Vektor | ||
|
|
|
|
|
|
|
Jika Anda masih membutuhkan versi asinkron dari metode MilvusClient lainnya, Anda dapat mengirimkan permintaan fitur di repositori pymilvus. Kontribusi kode juga diterima.
Membuat perulangan kejadian
Aplikasi yang menggunakan asinkronisasi biasanya menggunakan event loop sebagai pengatur untuk mengelola tugas-tugas asinkron dan operasi I/O. Dalam tutorial ini, kita akan mendapatkan event loop dari asyncio dan menggunakannya sebagai orkestrator.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
Terhubung dengan AsyncMilvusClient
Contoh berikut ini mendemonstrasikan cara menghubungkan Milvus secara asinkron.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
Membuat skema
Saat ini, create_schema()
tidak tersedia di AsyncMilvusClient. Sebagai gantinya, kita akan menggunakan MilvusClient untuk membuat skema untuk koleksi.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
AsyncMilvusClient memanggil metode create_schema()
secara sinkron; oleh karena itu, Anda tidak perlu mengatur pemanggilan menggunakan perulangan peristiwa.
Membuat koleksi
Sekarang kita akan menggunakan skema untuk membuat koleksi. Perhatikan bahwa Anda perlu mengawali kata kunci await
pada setiap pemanggilan ke metode AsyncMilvusClient
dan menempatkan pemanggilan tersebut di dalam fungsi async
sebagai berikut.
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
Membuat indeks
Anda juga perlu membuat indeks untuk semua bidang vektor dan bidang skalar opsional. Menurut skema yang didefinisikan di atas, ada dua bidang vektor dalam koleksi, dan Anda akan membuat indeks untuk mereka sebagai berikut.
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
Memuat koleksi
Koleksi dapat dimuat setelah bidang yang diperlukan diindeks. Kode berikut ini menunjukkan cara memuat koleksi secara asinkron.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
Menyisipkan data
Anda dapat menggunakan model penyematan yang tersedia di pymilvus untuk menghasilkan penyematan vektor untuk teks Anda. Untuk detailnya, lihat Ikhtisar Penyematan. Pada bagian ini, kita akan menyisipkan data yang dibuat secara acak ke dalam koleksi.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
Kueri
Setelah koleksi dimuat dan diisi dengan data, Anda dapat melakukan pencarian dan kueri di dalamnya. Pada bagian ini, Anda akan menemukan jumlah entitas di bidang text
yang dimulai dengan kata random
dalam koleksi bernama my_collection
.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
Pencarian
Pada bagian ini, Anda akan melakukan pencarian vektor pada bidang vektor padat dan bidang vektor jarang dari koleksi target.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
Hasil pencarian harus mencantumkan tiga set hasil yang sesuai dengan vektor kueri yang ditentukan.
Pencarian Hibrida
Pencarian hibrida menggabungkan hasil dari beberapa pencarian dan memberi peringkat ulang untuk mendapatkan pemanggilan yang lebih baik. Pada bagian ini, Anda akan melakukan pencarian hibrida menggunakan bidang vektor padat dan jarang.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))