🚀 Prueba Zilliz Cloud, el Milvus completamente gestionado, gratis—¡experimenta un rendimiento 10 veces más rápido! Prueba Ahora>>

milvus-logo
LFAI
Home
  • Tutoriales

Tutorial: Utilizar AsyncMilvusClient con asyncio

AsyncMilvusClient es un MilvusClient asíncrono que ofrece una API basada en coroutinas para el acceso no bloqueante a Milvus a través de asyncio. En este artículo, conocerá el proceso de llamada a las API que proporciona AsyncMilvusClient y los aspectos a los que debe prestar atención.

Visión general

Asyncio es una biblioteca para escribir código concurrente utilizando la sintaxis async/await y sirve como base para el cliente asíncrono de alto rendimiento de Milvus, que encajará en su biblioteca de código que se ejecuta sobre asyncio.

Los métodos que proporciona AsyncMilvusClient tienen idénticos conjuntos de parámetros y comportamientos que los de MilvusClient. La única diferencia radica en la forma de llamarlos. La siguiente tabla enumera los métodos disponibles en AsyncMilvusClient.

Cliente

close()

Colección y partición

create_collection()

drop_collection()

create_partition()

drop_partition()

Índice

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vector

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

Si todavía necesita la versión asíncrona de cualquier otro método MilvusClient, puede enviar una solicitud de función en el repositorio pymilvus. La contribución de código también es bienvenida.

Crear un bucle de eventos

Las aplicaciones que utilizan asyncio suelen utilizar el bucle de eventos como orquestador para gestionar las tareas asíncronas y las operaciones de E/S. En este tutorial, obtendremos un bucle de eventos de asyncio y lo utilizaremos como orquestador.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Conectar con AsyncMilvusClient

El siguiente ejemplo demuestra cómo conectar Milvus de forma asíncrona.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Crear esquema

Actualmente, create_schema() no está disponible en AsyncMilvusClient. En su lugar, utilizaremos MilvusClient para crear el esquema de la colección.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient llama al método create_schema() de forma sincrónica; por lo tanto, no es necesario orquestar la llamada utilizando el bucle de eventos.

Crear colección

Ahora utilizaremos el esquema para crear una colección. Ten en cuenta que necesitas anteponer la palabra clave await a cualquier llamada a los métodos AsyncMilvusClient y colocar la llamada dentro de una función async como se indica a continuación.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Crear índice

También es necesario crear índices para todos los campos vectoriales y campos escalares opcionales. Según el esquema definido anteriormente, hay dos campos vectoriales en la colección, y crearás índices para ellos de la siguiente manera.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Cargar colección

Una colección se puede cargar después de haber indexado los campos necesarios. El siguiente código muestra cómo cargar la colección de forma asíncrona.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Insertar datos

Puede utilizar los modelos de incrustación disponibles en pymilvus para generar incrustaciones vectoriales para sus textos. Para obtener más información, consulte Visión general de la incrustación. En esta sección, insertaremos datos generados aleatoriamente en la colección.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Consulta

Una vez que la colección está cargada y llena de datos, puede realizar búsquedas y consultas en ella. En esta sección, va a buscar el número de entidades del campo text que empiezan por la palabra random en la colección denominada my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

En esta sección, realizará búsquedas vectoriales en los campos vectoriales densos y dispersos de la colección de destino.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

El resultado de la búsqueda mostrará tres conjuntos de resultados correspondientes a los vectores de consulta especificados.

Una búsqueda híbrida combina los resultados de varias búsquedas y los reordena para obtener una mejor recuperación. En esta sección, va a realizar una búsqueda híbrida utilizando los campos de vectores densos y dispersos.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

¿Fue útil esta página?