Tutoriel : Utiliser AsyncMilvusClient avec asyncio
AsyncMilvusClient est un MilvusClient asynchrone qui offre une API basée sur des coroutines pour un accès non bloquant à Milvus via asyncio. Dans cet article, vous découvrirez le processus d'appel des API fournies par AsyncMilvusClient et les aspects auxquels vous devez prêter attention.
Vue d'ensemble
Asyncio est une bibliothèque permettant d'écrire du code concurrent à l'aide de la syntaxe async/await et sert de base au client asynchrone hautes performances de Milvus, qui s'intégrera dans votre bibliothèque de code s'exécutant au-dessus d'asyncio.
Les méthodes fournies par AsyncMilvusClient ont des jeux de paramètres et des comportements identiques à ceux de MilvusClient. La seule différence réside dans la manière dont vous les appelez. Le tableau suivant répertorie les méthodes disponibles dans AsyncMilvusClient.
Client | ||
---|---|---|
| ||
Collecte et partition | ||
|
|
|
| ||
Index | ||
|
|
|
|
|
|
Vecteur | ||
|
|
|
|
|
|
|
Si vous avez toujours besoin de la version asynchrone d'une autre méthode MilvusClient, vous pouvez soumettre une demande de fonctionnalité dans le repo pymilvus. Les contributions au code sont également les bienvenues.
Créer une boucle d'événements
Les applications utilisant asyncio utilisent généralement la boucle d'événements comme orchestrateur pour gérer les tâches asynchrones et les opérations d'E/S. Dans ce tutoriel, nous allons obtenir une boucle d'événement à partir d'asyncio et l'utiliser comme orchestrateur.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
Connexion avec AsyncMilvusClient
L'exemple suivant montre comment se connecter à Milvus de manière asynchrone.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
Créer un schéma
Actuellement, create_schema()
n'est pas disponible dans AsyncMilvusClient. Nous utiliserons donc MilvusClient pour créer le schéma de la collection.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
AsyncMilvusClient appelle la méthode create_schema()
de manière synchrone ; il n'est donc pas nécessaire d'orchestrer l'appel à l'aide de la boucle d'événements.
Création de la collection
Nous allons maintenant utiliser le schéma pour créer une collection. Notez que vous devez préfixer le mot-clé await
à tout appel aux méthodes AsyncMilvusClient
et placer l'appel à l'intérieur d'une fonction async
comme suit.
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
Créer un index
Vous devez également créer des index pour tous les champs vectoriels et les champs scalaires facultatifs. D'après le schéma défini ci-dessus, il y a deux champs vectoriels dans la collection, et vous allez créer des index pour eux comme suit.
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
Chargement de la collection
Une collection peut être chargée après que les champs nécessaires ont été indexés. Le code suivant montre comment charger la collection de manière asynchrone.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
Insérer des données
Vous pouvez utiliser les modèles d'intégration disponibles dans pymilvus pour générer des intégrations vectorielles pour vos textes. Pour plus de détails, reportez-vous à la section Vue d'ensemble de l'intégration. Dans cette section, nous allons insérer des données générées aléatoirement dans la collection.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
Requête
Une fois la collection chargée et remplie de données, vous pouvez y effectuer des recherches et des requêtes. Dans cette section, vous allez trouver le nombre d'entités dans le champ text
commençant par le mot random
dans la collection nommée my_collection
.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
Recherche
Dans cette section, vous allez effectuer des recherches vectorielles sur les champs vectoriels denses et épars de la collection cible.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
Le résultat de la recherche doit répertorier trois ensembles de résultats correspondant aux vecteurs d'interrogation spécifiés.
Recherche hybride
Une recherche hybride combine les résultats de plusieurs recherches et les réorganise pour obtenir un meilleur rappel. Dans cette section, vous allez effectuer une recherche hybride en utilisant les champs de vecteurs denses et épars.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))