Funções assíncronas na integração LangChain Milvus

Open In Colab GitHub Repository

Este tutorial explora como aproveitar as funções assíncronas no langchain-milvus para criar aplicativos de alto desempenho. Usando métodos assíncronos, você pode melhorar significativamente o rendimento e a capacidade de resposta de sua aplicação, especialmente ao lidar com recuperação em larga escala. Quer esteja a construir um sistema de recomendação em tempo real, a implementar a pesquisa semântica na sua aplicação ou a criar um pipeline RAG (Retrieval-Augmented Generation), as operações assíncronas podem ajudá-lo a lidar com pedidos concorrentes de forma mais eficiente. O banco de dados vetorial de alto desempenho Milvus, combinado com as poderosas abstrações LLM do LangChain, pode fornecer uma base robusta para a criação de aplicativos de IA escalonáveis.

Visão geral da API assíncrona

O langchain-milvus fornece suporte abrangente a operações assíncronas, melhorando significativamente o desempenho em cenários simultâneos de grande escala. A API assíncrona mantém um design de interface consistente com a API de sincronização.

Funções Assíncronas Principais

Para usar operações assíncronas em langchain-milvus, simplesmente adicione um prefixo a aos nomes dos métodos. Isso permite uma melhor utilização dos recursos e um melhor rendimento ao lidar com pedidos de recuperação simultâneos.

Tipo de operaçãoMétodo síncronoMétodo assíncronoDescrição
Adicionar textosadd_texts()aadd_texts()Adicionar textos ao armazenamento de vectores
Adicionar documentosadd_documents()aadd_documents()Adicionar documentos ao repositório de vectores
Adicionar incrustaçõesadd_embeddings()aadd_embeddings()Adicionar vectores de incorporação
Pesquisa de semelhançassimilarity_search()asimilarity_search()Pesquisa semântica por texto
Pesquisa vetorialsimilarity_search_by_vector()asimilarity_search_by_vector()Pesquisa semântica por vetor
Pesquisa com pontuaçãosimilarity_search_with_score()asimilarity_search_with_score()Pesquisa semântica por texto e retorno de pontuações de similaridade
Pesquisa vetorial com pontuaçãosimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Pesquisa semântica por vetor e retorno de pontuações de similaridade
Pesquisa de diversidademax_marginal_relevance_search()amax_marginal_relevance_search()Pesquisa MMR (devolve os semelhantes ao mesmo tempo que optimiza a diversidade)
Pesquisa de diversidade vetorialmax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()Pesquisa MMR por vetor
Operação de eliminaçãodelete()adelete()Eliminar documentos
Operação Upsertupsert()aupsert()Upsert (atualizar se existente, caso contrário inserir) documentos
Pesquisa de metadadossearch_by_metadata()asearch_by_metadata()Consulta com filtragem de metadados
Obter chaves primáriasget_pks()aget_pks()Obter chaves primárias por expressão
Criar a partir de textosfrom_texts()afrom_texts()Criar armazenamento vetorial a partir de textos

Para obter informações mais detalhadas sobre estas funções, consulte a Referência da API.

Benefícios de desempenho

As operações assíncronas proporcionam melhorias significativas no desempenho ao lidar com grandes volumes de pedidos simultâneos, particularmente adequadas para:

  • Processamento de documentos em lote
  • Cenários de pesquisa de alta simultaneidade
  • Aplicações RAG de produção
  • Importação/exportação de dados em grande escala

Neste tutorial, demonstraremos esses benefícios de desempenho por meio de comparações detalhadas de operações síncronas e assíncronas, mostrando como aproveitar as APIs assíncronas para obter o melhor desempenho em seus aplicativos.

Antes de começar

Os snippets de código nesta página exigem as seguintes dependências:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).

Vamos utilizar modelos OpenAI. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Se estiver a utilizar o Jupyter Notebook, tem de executar esta linha de código antes de executar o código assíncrono:

import nest_asyncio

nest_asyncio.apply()

Explorando APIs assíncronas e comparação de desempenho

Agora vamos nos aprofundar na comparação de desempenho entre operações síncronas e assíncronas com langchain-milvus.

Primeiro, importe as bibliotecas necessárias:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Configurando funções de teste

Vamos criar funções auxiliares para gerar dados de teste:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Inicializar o armazenamento de vetores

Antes de podermos executar nossos testes de desempenho, precisamos configurar um armazenamento de vetores Milvus limpo. Esta função garante que começamos com uma nova coleção para cada teste, eliminando qualquer interferência de dados anteriores:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync: Adicionar documentos

Agora vamos comparar o desempenho da adição síncrona e assíncrona de documentos. Essas funções nos ajudarão a medir o quanto as operações assíncronas podem ser mais rápidas ao adicionar vários documentos ao repositório de vetores. A versão assíncrona cria tarefas para cada adição de documento e as executa simultaneamente, enquanto a versão sincronizada processa os documentos um a um:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Agora vamos executar nossos testes de desempenho com diferentes contagens de documentos para ver as diferenças de desempenho no mundo real. Vamos testar com cargas variadas para entender como as operações assíncronas são escalonadas em comparação com suas contrapartes síncronas. Os testes medirão o tempo de execução de ambas as abordagens e ajudarão a demonstrar os benefícios de desempenho das operações assíncronas:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Para a comparação do desempenho da pesquisa, precisaremos primeiro preencher o armazenamento de vetores. As funções a seguir nos ajudarão a medir o desempenho da pesquisa, criando várias consultas de pesquisa simultâneas e comparando o tempo de execução entre as abordagens síncrona e assíncrona:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Agora vamos executar testes abrangentes de desempenho de pesquisa para ver como as operações assíncronas são escalonadas em comparação com as síncronas. Vamos testar com diferentes volumes de consulta para demonstrar os benefícios de desempenho das operações assíncronas, especialmente à medida que o número de operações simultâneas aumenta:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Assíncrono vs Sincrono: Apagar

As operações de eliminação são outro aspeto crítico em que as operações assíncronas podem proporcionar melhorias significativas no desempenho. Vamos criar funções para medir a diferença de desempenho entre operações de exclusão síncronas e assíncronas. Esses testes ajudarão a demonstrar como as operações assíncronas podem lidar com exclusões em lote de forma mais eficiente:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Agora vamos executar os testes de desempenho de exclusão para quantificar a diferença de desempenho. Começaremos com um novo armazenamento de vetores preenchido com dados de teste e, em seguida, executaremos operações de exclusão usando abordagens síncronas e assíncronas:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Conclusão

Este tutorial demonstrou as vantagens significativas de desempenho do uso de operações assíncronas com LangChain e Milvus. Comparamos as versões síncronas e assíncronas das operações de adição, pesquisa e exclusão, mostrando como as operações assíncronas podem fornecer melhorias substanciais de velocidade, especialmente para operações em grandes lotes.

Principais conclusões:

  1. As operações assíncronas oferecem o maior benefício ao executar muitas operações individuais que podem ser executadas em paralelo
  2. Para cargas de trabalho que geram maior taxa de transferência, a diferença de desempenho entre as operações sincronizadas e assíncronas aumenta
  3. As operações assíncronas utilizam totalmente o poder de computação das máquinas

Ao criar aplicativos RAG de produção com LangChain e Milvus, considere o uso da API assíncrona quando o desempenho for uma preocupação, especialmente para operações simultâneas.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Esta página foi útil?