🚀 Experimente o Zilliz Cloud, o Milvus totalmente gerenciado, gratuitamente—experimente um desempenho 10x mais rápido! Experimente Agora>>

milvus-logo
LFAI
Home
  • Tutoriais

Tutorial: Usar AsyncMilvusClient com asyncio

O AsyncMilvusClient é um MilvusClient assíncrono que oferece uma API baseada em corrotinas para acesso não bloqueante ao Milvus via asyncio. Neste artigo, ficará a conhecer o processo de chamada das APIs que o AsyncMilvusClient fornece e os aspectos a que deve prestar atenção.

Visão geral

O Asyncio é uma biblioteca para escrever código concorrente usando a sintaxe async/await e serve como base para o cliente assíncrono de alto desempenho do Milvus, que se encaixará na sua biblioteca de código executada sobre o Asyncio.

Os métodos que o AsyncMilvusClient fornece têm conjuntos de parâmetros e comportamentos idênticos aos do MilvusClient. A única diferença está na forma como são chamados. A tabela seguinte lista os métodos disponíveis no AsyncMilvusClient.

Cliente

close()

Coleção e partição

create_collection()

drop_collection()

create_partition()

drop_partition()

Índice

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vetor

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

Se ainda precisar da versão assíncrona de qualquer outro método MilvusClient, pode submeter um pedido de funcionalidade no repositório pymilvus. Contribuições de código também são bem-vindas.

Criar um ciclo de eventos

As aplicações que usam asyncio normalmente usam o loop de eventos como orquestrador para gerenciar tarefas assíncronas e operações de E/S. Neste tutorial, vamos obter um loop de eventos do asyncio e usá-lo como orquestrador.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Conectar com AsyncMilvusClient

O exemplo a seguir demonstra como conectar o Milvus de forma assíncrona.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Criar esquema

Atualmente, o create_schema() não está disponível no AsyncMilvusClient. Em vez disso, utilizaremos o MilvusClient para criar o esquema para a coleção.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

O AsyncMilvusClient chama o método create_schema() de forma síncrona; por conseguinte, não é necessário orquestrar a chamada utilizando o ciclo de eventos.

Criar coleção

Agora vamos utilizar o esquema para criar uma coleção. Tenha em atenção que tem de colocar a palavra-chave await como prefixo em qualquer chamada aos métodos AsyncMilvusClient e colocar a chamada dentro de uma função async da seguinte forma.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Criar índice

Também é necessário criar índices para todos os campos vectoriais e campos escalares opcionais. De acordo com o esquema definido acima, existem dois campos vectoriais na coleção, pelo que deve criar índices para eles da seguinte forma.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Carregar coleção

Uma coleção pode ser carregada depois de os campos necessários estarem indexados. O código seguinte demonstra como carregar a coleção de forma assíncrona.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Inserir dados

Pode utilizar os modelos de incorporação disponíveis no pymilvus para gerar incorporação de vectores para os seus textos. Para obter detalhes, consulte Visão geral da incorporação. Nesta secção, iremos inserir dados gerados aleatoriamente na coleção.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Consulta

Depois de a coleção ser carregada e preenchida com dados, pode efetuar pesquisas e consultas na mesma. Nesta secção, vai encontrar o número de entidades no campo text que começa com a palavra random na coleção denominada my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

Nesta secção, irá efetuar pesquisas vectoriais nos campos vectoriais densos e esparsos da coleção de destino.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

A saída da pesquisa deve listar três conjuntos de resultados correspondentes aos vectores de consulta especificados.

Uma pesquisa híbrida combina os resultados de várias pesquisas e reordena-os para obter uma melhor recuperação. Nesta secção, vai realizar uma pesquisa híbrida utilizando os campos de vectores densos e esparsos.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Esta página foi útil?