Funções assíncronas na integração LangChain Milvus
Este tutorial explora como aproveitar as funções assíncronas no langchain-milvus para criar aplicativos de alto desempenho. Usando métodos assíncronos, você pode melhorar significativamente o rendimento e a capacidade de resposta de sua aplicação, especialmente ao lidar com recuperação em larga escala. Quer esteja a construir um sistema de recomendação em tempo real, a implementar a pesquisa semântica na sua aplicação ou a criar um pipeline RAG (Retrieval-Augmented Generation), as operações assíncronas podem ajudá-lo a lidar com pedidos concorrentes de forma mais eficiente. O banco de dados vetorial de alto desempenho Milvus, combinado com as poderosas abstrações LLM do LangChain, pode fornecer uma base robusta para a criação de aplicativos de IA escalonáveis.
Visão geral da API assíncrona
O langchain-milvus fornece suporte abrangente a operações assíncronas, melhorando significativamente o desempenho em cenários simultâneos de grande escala. A API assíncrona mantém um design de interface consistente com a API de sincronização.
Funções Assíncronas Principais
Para usar operações assíncronas em langchain-milvus, simplesmente adicione um prefixo a aos nomes dos métodos. Isso permite uma melhor utilização dos recursos e um melhor rendimento ao lidar com pedidos de recuperação simultâneos.
| Tipo de operação | Método síncrono | Método assíncrono | Descrição |
|---|---|---|---|
| Adicionar textos | add_texts() | aadd_texts() | Adicionar textos ao armazenamento de vectores |
| Adicionar documentos | add_documents() | aadd_documents() | Adicionar documentos ao repositório de vectores |
| Adicionar incrustações | add_embeddings() | aadd_embeddings() | Adicionar vectores de incorporação |
| Pesquisa de semelhanças | similarity_search() | asimilarity_search() | Pesquisa semântica por texto |
| Pesquisa vetorial | similarity_search_by_vector() | asimilarity_search_by_vector() | Pesquisa semântica por vetor |
| Pesquisa com pontuação | similarity_search_with_score() | asimilarity_search_with_score() | Pesquisa semântica por texto e retorno de pontuações de similaridade |
| Pesquisa vetorial com pontuação | similarity_search_with_score_by_vector() | asimilarity_search_with_score_by_vector() | Pesquisa semântica por vetor e retorno de pontuações de similaridade |
| Pesquisa de diversidade | max_marginal_relevance_search() | amax_marginal_relevance_search() | Pesquisa MMR (devolve os semelhantes ao mesmo tempo que optimiza a diversidade) |
| Pesquisa de diversidade vetorial | max_marginal_relevance_search_by_vector() | amax_marginal_relevance_search_by_vector() | Pesquisa MMR por vetor |
| Operação de eliminação | delete() | adelete() | Eliminar documentos |
| Operação Upsert | upsert() | aupsert() | Upsert (atualizar se existente, caso contrário inserir) documentos |
| Pesquisa de metadados | search_by_metadata() | asearch_by_metadata() | Consulta com filtragem de metadados |
| Obter chaves primárias | get_pks() | aget_pks() | Obter chaves primárias por expressão |
| Criar a partir de textos | from_texts() | afrom_texts() | Criar armazenamento vetorial a partir de textos |
Para obter informações mais detalhadas sobre estas funções, consulte a Referência da API.
Benefícios de desempenho
As operações assíncronas proporcionam melhorias significativas no desempenho ao lidar com grandes volumes de pedidos simultâneos, particularmente adequadas para:
- Processamento de documentos em lote
- Cenários de pesquisa de alta simultaneidade
- Aplicações RAG de produção
- Importação/exportação de dados em grande escala
Neste tutorial, demonstraremos esses benefícios de desempenho por meio de comparações detalhadas de operações síncronas e assíncronas, mostrando como aproveitar as APIs assíncronas para obter o melhor desempenho em seus aplicativos.
Antes de começar
Os snippets de código nesta página exigem as seguintes dependências:
! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio
Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
Vamos utilizar modelos OpenAI. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente:
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Se estiver a utilizar o Jupyter Notebook, tem de executar esta linha de código antes de executar o código assíncrono:
import nest_asyncio
nest_asyncio.apply()
Explorando APIs assíncronas e comparação de desempenho
Agora vamos nos aprofundar na comparação de desempenho entre operações síncronas e assíncronas com langchain-milvus.
Primeiro, importe as bibliotecas necessárias:
import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus
# Define the Milvus URI
URI = "http://localhost:19530"
Configurando funções de teste
Vamos criar funções auxiliares para gerar dados de teste:
def random_id():
"""Generate a random string ID"""
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def generate_test_documents(num_docs):
"""Generate test documents for performance testing"""
docs = []
for i in range(num_docs):
content = (
f"This is test document {i} with some random content: {random.random()}"
)
metadata = {
"id": f"doc_{i}",
"score": random.random(),
"category": f"cat_{i % 5}",
}
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
Inicializar o armazenamento de vetores
Antes de podermos executar nossos testes de desempenho, precisamos configurar um armazenamento de vetores Milvus limpo. Esta função garante que começamos com uma nova coleção para cada teste, eliminando qualquer interferência de dados anteriores:
def init_vector_store():
"""Initialize and return a fresh vector store for testing"""
return Milvus(
embedding_function=OpenAIEmbeddings(),
collection_name="langchain_perf_test",
connection_args={"uri": URI},
auto_id=True,
drop_old=True, # Always start with a fresh collection
)
Async vs Sync: Adicionar documentos
Agora vamos comparar o desempenho da adição síncrona e assíncrona de documentos. Essas funções nos ajudarão a medir o quanto as operações assíncronas podem ser mais rápidas ao adicionar vários documentos ao repositório de vetores. A versão assíncrona cria tarefas para cada adição de documento e as executa simultaneamente, enquanto a versão sincronizada processa os documentos um a um:
async def async_add(milvus_store, num_adding):
"""Add documents asynchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
tasks = []
for doc in docs:
# Create tasks for each document addition
task = milvus_store.aadd_documents([doc])
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_add(milvus_store, num_adding):
"""Add documents synchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
for doc in docs:
result = milvus_store.add_documents([doc])
end_time = time.time()
return end_time - start_time
Agora vamos executar nossos testes de desempenho com diferentes contagens de documentos para ver as diferenças de desempenho no mundo real. Vamos testar com cargas variadas para entender como as operações assíncronas são escalonadas em comparação com suas contrapartes síncronas. Os testes medirão o tempo de execução de ambas as abordagens e ajudarão a demonstrar os benefícios de desempenho das operações assíncronas:
add_counts = [10, 100]
# Get the event loop
loop = asyncio.get_event_loop()
# Create a new vector store for testing
milvus_store = init_vector_store()
# Test async document addition
for count in add_counts:
async def measure_async_add():
async_time = await async_add(milvus_store, count)
print(f"Async add for {count} documents took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
# Reset vector store for sync tests
milvus_store = init_vector_store()
# Test sync document addition
for count in add_counts:
sync_time = sync_add(milvus_store, count)
print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)
Async add for 10 documents took 1.74 seconds
2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)
Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds
Assíncrono vs Sincrono: Pesquisa
Para a comparação do desempenho da pesquisa, precisaremos primeiro preencher o armazenamento de vetores. As funções a seguir nos ajudarão a medir o desempenho da pesquisa, criando várias consultas de pesquisa simultâneas e comparando o tempo de execução entre as abordagens síncrona e assíncrona:
def populate_vector_store(milvus_store, num_docs=1000):
"""Populate the vector store with test documents"""
docs = generate_test_documents(num_docs)
milvus_store.add_documents(docs)
return docs
async def async_search(milvus_store, num_queries):
"""Perform async searches and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_queries):
query = f"test document {i % 50}"
task = milvus_store.asimilarity_search(query=query, k=3)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_search(milvus_store, num_queries):
"""Perform sync searches and measure the time"""
start_time = time.time()
for i in range(num_queries):
query = f"test document {i % 50}"
result = milvus_store.similarity_search(query=query, k=3)
end_time = time.time()
return end_time - start_time
Agora vamos executar testes abrangentes de desempenho de pesquisa para ver como as operações assíncronas são escalonadas em comparação com as síncronas. Vamos testar com diferentes volumes de consulta para demonstrar os benefícios de desempenho das operações assíncronas, especialmente à medida que o número de operações simultâneas aumenta:
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
query_counts = [10, 100]
# Test async search
for count in query_counts:
async def measure_async_search():
async_time = await async_search(milvus_store, count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
# Test sync search
for count in query_counts:
sync_time = sync_search(milvus_store, count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)
Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds
Assíncrono vs Sincrono: Apagar
As operações de eliminação são outro aspeto crítico em que as operações assíncronas podem proporcionar melhorias significativas no desempenho. Vamos criar funções para medir a diferença de desempenho entre operações de exclusão síncronas e assíncronas. Esses testes ajudarão a demonstrar como as operações assíncronas podem lidar com exclusões em lote de forma mais eficiente:
async def async_delete(milvus_store, num_deleting):
"""Delete documents asynchronously and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
task = milvus_store.adelete(expr=expr)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_delete(milvus_store, num_deleting):
"""Delete documents synchronously and measure the time"""
start_time = time.time()
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
result = milvus_store.delete(expr=expr)
end_time = time.time()
return end_time - start_time
Agora vamos executar os testes de desempenho de exclusão para quantificar a diferença de desempenho. Começaremos com um novo armazenamento de vetores preenchido com dados de teste e, em seguida, executaremos operações de exclusão usando abordagens síncronas e assíncronas:
delete_counts = [10, 100]
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test async delete
for count in delete_counts:
async def measure_async_delete():
async_time = await async_delete(milvus_store, count)
print(f"Async delete for {count} operations took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_delete())
# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test sync delete
for count in delete_counts:
sync_time = sync_delete(milvus_store, count)
print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)
Async delete for 10 operations took 0.58 seconds
2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)
Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds
Conclusão
Este tutorial demonstrou as vantagens significativas de desempenho do uso de operações assíncronas com LangChain e Milvus. Comparamos as versões síncronas e assíncronas das operações de adição, pesquisa e exclusão, mostrando como as operações assíncronas podem fornecer melhorias substanciais de velocidade, especialmente para operações em grandes lotes.
Principais conclusões:
- As operações assíncronas oferecem o maior benefício ao executar muitas operações individuais que podem ser executadas em paralelo
- Para cargas de trabalho que geram maior taxa de transferência, a diferença de desempenho entre as operações sincronizadas e assíncronas aumenta
- As operações assíncronas utilizam totalmente o poder de computação das máquinas
Ao criar aplicativos RAG de produção com LangChain e Milvus, considere o uso da API assíncrona quando o desempenho for uma preocupação, especialmente para operações simultâneas.