Tutorial: Verwendung von AsyncMilvusClient mit asyncio
AsyncMilvusClient ist ein asynchroner MilvusClient, der eine Coroutine-basierte API für den nicht-blockierenden Zugriff auf Milvus über asyncio bietet. In diesem Artikel erfahren Sie, wie Sie die APIs, die AsyncMilvusClient zur Verfügung stellt, aufrufen können und welche Aspekte Sie dabei beachten müssen.
Überblick
Asyncio ist eine Bibliothek zum Schreiben von nebenläufigem Code unter Verwendung der async/await-Syntax und dient als Grundlage für den hochleistungsfähigen asynchronen Client von Milvus, der sich in Ihre Code-Bibliothek einfügt, die auf Asyncio aufbaut.
Die Methoden, die AsyncMilvusClient bereitstellt, haben identische Parametersätze und Verhaltensweisen wie die von MilvusClient. Der einzige Unterschied liegt in der Art und Weise, wie Sie sie aufrufen. In der folgenden Tabelle sind die in AsyncMilvusClient verfügbaren Methoden aufgeführt.
Klient | ||
---|---|---|
| ||
Sammlung & Partition | ||
|
|
|
| ||
Index | ||
|
|
|
|
|
|
Vektor | ||
|
|
|
|
|
|
|
Wenn Sie noch die asynchrone Version einer anderen MilvusClient-Methode benötigen, können Sie einen Feature-Request im pymilvus-Repo einreichen. Codebeiträge sind ebenfalls willkommen.
Erstellen einer Ereignisschleife
Anwendungen, die Asyncio verwenden, nutzen typischerweise die Ereignisschleife als Orchestrator für die Verwaltung asynchroner Aufgaben und E/A-Operationen. In diesem Tutorial werden wir eine Ereignisschleife von asyncio erhalten und sie als Orchestrator verwenden.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
Verbinden mit AsyncMilvusClient
Das folgende Beispiel zeigt, wie eine asynchrone Verbindung zu Milvus hergestellt werden kann.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
Schema erstellen
Derzeit ist create_schema()
in AsyncMilvusClient nicht verfügbar. Stattdessen werden wir MilvusClient verwenden, um das Schema für die Sammlung zu erstellen.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
AsyncMilvusClient ruft die Methode create_schema()
synchron auf; daher müssen Sie den Aufruf nicht über die Ereignisschleife orchestrieren.
Sammlung erstellen
Nun werden wir das Schema verwenden, um eine Sammlung zu erstellen. Beachten Sie, dass Sie jedem Aufruf der Methoden AsyncMilvusClient
das Schlüsselwort await
voranstellen und den Aufruf innerhalb einer Funktion async
wie folgt platzieren müssen.
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
Index erstellen
Sie müssen auch Indizes für alle Vektorfelder und optionalen skalaren Felder erstellen. Gemäß dem oben definierten Schema gibt es zwei Vektorfelder in der Sammlung, für die Sie wie folgt Indizes erstellen werden.
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
Sammlung laden
Eine Sammlung kann geladen werden, nachdem die erforderlichen Felder indiziert sind. Der folgende Code demonstriert, wie die Sammlung asynchron geladen wird.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
Daten einfügen
Sie können die in pymilvus verfügbaren Einbettungsmodelle verwenden, um Vektoreinbettungen für Ihre Texte zu erzeugen. Details dazu finden Sie unter Einbettungsübersicht. In diesem Abschnitt werden wir zufällig generierte Daten in die Sammlung einfügen.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
Abfrage
Nachdem die Sammlung geladen und mit Daten gefüllt ist, können Sie darin Suchen und Abfragen durchführen. In diesem Abschnitt werden Sie die Anzahl der Entitäten im Feld text
finden, die mit dem Wort random
in der Sammlung my_collection
beginnen.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
Suche
In diesem Abschnitt werden Sie Vektorsuchen in den dichten und spärlichen Vektorfeldern der Zielsammlung durchführen.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
Die Suchausgabe sollte drei Gruppen von Ergebnissen auflisten, die den angegebenen Abfragevektoren entsprechen.
Hybride Suche
Eine hybride Suche kombiniert die Ergebnisse mehrerer Suchen und ordnet sie neu an, um eine bessere Trefferquote zu erzielen. In diesem Abschnitt werden Sie eine hybride Suche unter Verwendung der dichten und spärlichen Vektorfelder durchführen.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))