🚀 Prova Zilliz Cloud, la versione completamente gestita di Milvus, gratuitamente—sperimenta prestazioni 10 volte più veloci! Prova Ora>>

milvus-logo
LFAI
Casa
  • Tutorial

Tutorial: Utilizzare AsyncMilvusClient con asyncio

AsyncMilvusClient è un MilvusClient asincrono che offre un'API basata su coroutine per l'accesso non bloccante a Milvus tramite asyncio. In questo articolo si illustra il processo di chiamata delle API che AsyncMilvusClient mette a disposizione e gli aspetti a cui è necessario prestare attenzione.

Panoramica

Asyncio è una libreria per la scrittura di codice concorrente che utilizza la sintassi async/await e funge da base per il client asincrono ad alte prestazioni di Milvus, che si inserisce nella libreria di codice che gira sopra asyncio.

I metodi forniti da AsyncMilvusClient hanno parametri e comportamenti identici a quelli di MilvusClient. L'unica differenza sta nel modo in cui vengono chiamati. La tabella seguente elenca i metodi disponibili in AsyncMilvusClient.

Client

close()

Raccolta e partizione

create_collection()

drop_collection()

create_partition()

drop_partition()

Indice

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vettore

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

Se avete ancora bisogno della versione asincrona di qualsiasi altro metodo di MilvusClient, potete inviare una richiesta di funzionalità nel repo di pymilvus. Anche il contributo al codice è ben accetto.

Creare un ciclo di eventi

Le applicazioni che utilizzano asyncio utilizzano tipicamente il ciclo di eventi come orchestratore per la gestione dei compiti asincroni e delle operazioni di I/O. In questo tutorial, otterremo una versione asincrona del ciclo di eventi. In questo tutorial, otterremo un ciclo di eventi da asyncio e lo utilizzeremo come orchestratore.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Connessione con AsyncMilvusClient

L'esempio seguente mostra come connettersi a Milvus in modo asincrono.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Creare lo schema

Attualmente, create_schema() non è disponibile in AsyncMilvusClient. Si utilizzerà invece MilvusClient per creare lo schema della collezione.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient chiama il metodo create_schema() in modo sincrono; pertanto, non è necessario orchestrare la chiamata utilizzando il ciclo degli eventi.

Creare la raccolta

Ora utilizzeremo lo schema per creare un insieme. Si noti che è necessario anteporre la parola chiave await a qualsiasi chiamata ai metodi AsyncMilvusClient e collocare la chiamata all'interno di una funzione async come segue.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Crea indice

È inoltre necessario creare indici per tutti i campi vettoriali e per i campi scalari opzionali. In base allo schema definito in precedenza, ci sono due campi vettoriali nell'insieme, per i quali si creeranno gli indici come segue.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Caricare la collezione

Una collezione può essere caricata dopo che i campi necessari sono stati indicizzati. Il codice seguente mostra come caricare l'insieme in modo asincrono.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Inserire i dati

È possibile utilizzare i modelli di incorporamento disponibili in pymilvus per generare incorporazioni vettoriali per i testi. Per maggiori dettagli, consultare la sezione Panoramica sugli incorporamenti. In questa sezione, inseriremo nella collezione dati generati in modo casuale.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Interrogazione

Dopo che la collezione è stata caricata e riempita di dati, è possibile effettuare ricerche e interrogazioni al suo interno. In questa sezione, si cercherà di trovare il numero di entità nel campo text che iniziano con la parola random nella raccolta denominata my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

In questa sezione si effettueranno ricerche vettoriali sui campi vettoriali densi e radi dell'insieme di destinazione.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

L'output della ricerca dovrebbe elencare tre serie di risultati corrispondenti ai vettori specificati.

Una ricerca ibrida combina i risultati di più ricerche e li riordina per ottenere un richiamo migliore. In questa sezione verrà eseguita una ricerca ibrida utilizzando i campi vettoriali densi e radi.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Questa pagina è stata utile?