🚀 Testen Sie Zilliz Cloud, die vollständig verwaltete Milvus, kostenlos – erleben Sie 10x schnellere Leistung! Jetzt testen>>

milvus-logo
LFAI
Home
  • Anleitungen

Tutorial: Verwendung von AsyncMilvusClient mit asyncio

AsyncMilvusClient ist ein asynchroner MilvusClient, der eine Coroutine-basierte API für den nicht-blockierenden Zugriff auf Milvus über asyncio bietet. In diesem Artikel erfahren Sie, wie Sie die APIs, die AsyncMilvusClient zur Verfügung stellt, aufrufen können und welche Aspekte Sie dabei beachten müssen.

Überblick

Asyncio ist eine Bibliothek zum Schreiben von nebenläufigem Code unter Verwendung der async/await-Syntax und dient als Grundlage für den hochleistungsfähigen asynchronen Client von Milvus, der sich in Ihre Code-Bibliothek einfügt, die auf Asyncio aufbaut.

Die Methoden, die AsyncMilvusClient bereitstellt, haben identische Parametersätze und Verhaltensweisen wie die von MilvusClient. Der einzige Unterschied liegt in der Art und Weise, wie Sie sie aufrufen. In der folgenden Tabelle sind die in AsyncMilvusClient verfügbaren Methoden aufgeführt.

Klient

close()

Sammlung & Partition

create_collection()

drop_collection()

create_partition()

drop_partition()

Index

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vektor

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

Wenn Sie noch die asynchrone Version einer anderen MilvusClient-Methode benötigen, können Sie einen Feature-Request im pymilvus-Repo einreichen. Codebeiträge sind ebenfalls willkommen.

Erstellen einer Ereignisschleife

Anwendungen, die Asyncio verwenden, nutzen typischerweise die Ereignisschleife als Orchestrator für die Verwaltung asynchroner Aufgaben und E/A-Operationen. In diesem Tutorial werden wir eine Ereignisschleife von asyncio erhalten und sie als Orchestrator verwenden.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Verbinden mit AsyncMilvusClient

Das folgende Beispiel zeigt, wie eine asynchrone Verbindung zu Milvus hergestellt werden kann.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Schema erstellen

Derzeit ist create_schema() in AsyncMilvusClient nicht verfügbar. Stattdessen werden wir MilvusClient verwenden, um das Schema für die Sammlung zu erstellen.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient ruft die Methode create_schema() synchron auf; daher müssen Sie den Aufruf nicht über die Ereignisschleife orchestrieren.

Sammlung erstellen

Nun werden wir das Schema verwenden, um eine Sammlung zu erstellen. Beachten Sie, dass Sie jedem Aufruf der Methoden AsyncMilvusClient das Schlüsselwort await voranstellen und den Aufruf innerhalb einer Funktion async wie folgt platzieren müssen.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Index erstellen

Sie müssen auch Indizes für alle Vektorfelder und optionalen skalaren Felder erstellen. Gemäß dem oben definierten Schema gibt es zwei Vektorfelder in der Sammlung, für die Sie wie folgt Indizes erstellen werden.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Sammlung laden

Eine Sammlung kann geladen werden, nachdem die erforderlichen Felder indiziert sind. Der folgende Code demonstriert, wie die Sammlung asynchron geladen wird.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Daten einfügen

Sie können die in pymilvus verfügbaren Einbettungsmodelle verwenden, um Vektoreinbettungen für Ihre Texte zu erzeugen. Details dazu finden Sie unter Einbettungsübersicht. In diesem Abschnitt werden wir zufällig generierte Daten in die Sammlung einfügen.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Abfrage

Nachdem die Sammlung geladen und mit Daten gefüllt ist, können Sie darin Suchen und Abfragen durchführen. In diesem Abschnitt werden Sie die Anzahl der Entitäten im Feld text finden, die mit dem Wort random in der Sammlung my_collection beginnen.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

In diesem Abschnitt werden Sie Vektorsuchen in den dichten und spärlichen Vektorfeldern der Zielsammlung durchführen.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

Die Suchausgabe sollte drei Gruppen von Ergebnissen auflisten, die den angegebenen Abfragevektoren entsprechen.

Eine hybride Suche kombiniert die Ergebnisse mehrerer Suchen und ordnet sie neu an, um eine bessere Trefferquote zu erzielen. In diesem Abschnitt werden Sie eine hybride Suche unter Verwendung der dichten und spärlichen Vektorfelder durchführen.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

War diese Seite hilfreich?