Tutorial: Usar AsyncMilvusClient com asyncio
O AsyncMilvusClient é um MilvusClient assíncrono que oferece uma API baseada em corrotinas para acesso não bloqueante ao Milvus via asyncio. Neste artigo, ficará a conhecer o processo de chamada das APIs que o AsyncMilvusClient fornece e os aspectos a que deve prestar atenção.
Visão geral
O Asyncio é uma biblioteca para escrever código concorrente usando a sintaxe async/await e serve como base para o cliente assíncrono de alto desempenho do Milvus, que se encaixará na sua biblioteca de código executada sobre o Asyncio.
Os métodos que o AsyncMilvusClient fornece têm conjuntos de parâmetros e comportamentos idênticos aos do MilvusClient. A única diferença está na forma como são chamados. A tabela seguinte lista os métodos disponíveis no AsyncMilvusClient.
Cliente | ||
---|---|---|
| ||
Coleção e partição | ||
|
|
|
| ||
Índice | ||
|
|
|
|
|
|
Vetor | ||
|
|
|
|
|
|
|
Se ainda precisar da versão assíncrona de qualquer outro método MilvusClient, pode submeter um pedido de funcionalidade no repositório pymilvus. Contribuições de código também são bem-vindas.
Criar um ciclo de eventos
As aplicações que usam asyncio normalmente usam o loop de eventos como orquestrador para gerenciar tarefas assíncronas e operações de E/S. Neste tutorial, vamos obter um loop de eventos do asyncio e usá-lo como orquestrador.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
Conectar com AsyncMilvusClient
O exemplo a seguir demonstra como conectar o Milvus de forma assíncrona.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
Criar esquema
Atualmente, o create_schema()
não está disponível no AsyncMilvusClient. Em vez disso, utilizaremos o MilvusClient para criar o esquema para a coleção.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
O AsyncMilvusClient chama o método create_schema()
de forma síncrona; por conseguinte, não é necessário orquestrar a chamada utilizando o ciclo de eventos.
Criar coleção
Agora vamos utilizar o esquema para criar uma coleção. Tenha em atenção que tem de colocar a palavra-chave await
como prefixo em qualquer chamada aos métodos AsyncMilvusClient
e colocar a chamada dentro de uma função async
da seguinte forma.
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
Criar índice
Também é necessário criar índices para todos os campos vectoriais e campos escalares opcionais. De acordo com o esquema definido acima, existem dois campos vectoriais na coleção, pelo que deve criar índices para eles da seguinte forma.
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
Carregar coleção
Uma coleção pode ser carregada depois de os campos necessários estarem indexados. O código seguinte demonstra como carregar a coleção de forma assíncrona.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
Inserir dados
Pode utilizar os modelos de incorporação disponíveis no pymilvus para gerar incorporação de vectores para os seus textos. Para obter detalhes, consulte Visão geral da incorporação. Nesta secção, iremos inserir dados gerados aleatoriamente na coleção.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
Consulta
Depois de a coleção ser carregada e preenchida com dados, pode efetuar pesquisas e consultas na mesma. Nesta secção, vai encontrar o número de entidades no campo text
que começa com a palavra random
na coleção denominada my_collection
.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
Pesquisa
Nesta secção, irá efetuar pesquisas vectoriais nos campos vectoriais densos e esparsos da coleção de destino.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
A saída da pesquisa deve listar três conjuntos de resultados correspondentes aos vectores de consulta especificados.
Pesquisa híbrida
Uma pesquisa híbrida combina os resultados de várias pesquisas e reordena-os para obter uma melhor recuperação. Nesta secção, vai realizar uma pesquisa híbrida utilizando os campos de vectores densos e esparsos.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))