Funciones asíncronas en la integración de LangChain Milvus

Open In Colab GitHub Repository

Este tutorial explora cómo aprovechar las funciones asíncronas en langchain-milvus para construir aplicaciones de alto rendimiento. Mediante el uso de métodos asíncronos, puede mejorar significativamente el rendimiento y la capacidad de respuesta de su aplicación, especialmente cuando se trata de recuperación a gran escala. Tanto si está creando un sistema de recomendación en tiempo real, implementando la búsqueda semántica en su aplicación o creando una canalización RAG (Retrieval-Augmented Generation), las operaciones asíncronas pueden ayudarle a gestionar las solicitudes concurrentes de forma más eficiente. La base de datos vectorial de alto rendimiento Milvus combinada con las potentes abstracciones LLM de LangChain pueden proporcionar una base sólida para crear aplicaciones de IA escalables.

Visión general de la API asíncrona

langchain-milvus proporciona un completo soporte de operaciones asíncronas, mejorando significativamente el rendimiento en escenarios concurrentes a gran escala. La API asíncrona mantiene un diseño de interfaz coherente con la API síncrona.

Funciones básicas asíncronas

Para utilizar operaciones asíncronas en langchain-milvus, basta con añadir un prefijo a a los nombres de los métodos. Esto permite una mejor utilización de los recursos y un mayor rendimiento cuando se gestionan solicitudes de recuperación simultáneas.

Tipo de operaciónMétodo SyncMétodo asíncronoDescripción
Añadir textosadd_texts()aadd_texts()Añadir textos al almacén vectorial
Añadir documentosadd_documents()aadd_documents()Añadir documentos al almacén vectorial
Añadir incrustacionesadd_embeddings()aadd_embeddings()Añadir vectores de incrustación
Búsqueda por similitudsimilarity_search()asimilarity_search()Búsqueda semántica por texto
Búsqueda vectorialsimilarity_search_by_vector()asimilarity_search_by_vector()Búsqueda semántica por vector
Búsqueda con puntuaciónsimilarity_search_with_score()asimilarity_search_with_score()Búsqueda semántica por texto y devolución de puntuaciones de similitud
Búsqueda vectorial con puntuaciónsimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Búsqueda semántica por vector y devolución de puntuaciones de similitud
Búsqueda por diversidadmax_marginal_relevance_search()amax_marginal_relevance_search()Búsqueda MMR (devuelve similares a la vez que optimiza la diversidad)
Búsqueda por diversidad vectorialmax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()Búsqueda MMR por vector
Operación de supresióndelete()adelete()Borrar documentos
Operación de inserciónupsert()aupsert()Reinsertar (actualizar si existe, insertar en caso contrario) documentos
Búsqueda de metadatossearch_by_metadata()asearch_by_metadata()Consulta con filtrado de metadatos
Obtener claves primariasget_pks()aget_pks()Obtención de claves primarias por expresión
Crear a partir de textosfrom_texts()afrom_texts()Crear almacén vectorial a partir de textos

Para obtener información más detallada sobre estas funciones, consulte la Referencia de la API.

Ventajas de rendimiento

Las operaciones asíncronas proporcionan importantes mejoras de rendimiento cuando se manejan grandes volúmenes de solicitudes concurrentes, especialmente adecuadas para:

  • Procesamiento de documentos por lotes
  • Escenarios de búsqueda de alta concurrencia
  • Aplicaciones RAG de producción
  • Importación y exportación de datos a gran escala

En este tutorial, demostraremos estas ventajas de rendimiento mediante comparaciones detalladas de operaciones síncronas y asíncronas, mostrándole cómo aprovechar las API asíncronas para obtener un rendimiento óptimo en sus aplicaciones.

Antes de empezar

Los fragmentos de código de esta página requieren las siguientes dependencias:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Si utilizas Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla y selecciona "Reiniciar sesión" en el menú desplegable).

Utilizaremos modelos OpenAI. Debes preparar la clave api OPENAI_API_KEY como variable de entorno:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Si utilizas Jupyter Notebook, deberás ejecutar esta línea de código antes de ejecutar el código asíncrono:

import nest_asyncio

nest_asyncio.apply()

Exploración de las API asíncronas y comparación del rendimiento

Ahora vamos a profundizar en la comparación de rendimiento entre operaciones síncronas y asíncronas con langchain-milvus.

En primer lugar, importa las bibliotecas necesarias:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Configuración de funciones de prueba

Vamos a crear funciones de ayuda para generar datos de prueba:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Inicializar el almacén de vectores

Antes de que podamos ejecutar nuestras pruebas de rendimiento, tenemos que configurar un almacén vectorial Milvus limpio. Esta función asegura que comencemos con una colección fresca para cada prueba, eliminando cualquier interferencia de datos anteriores:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync: Añadir documentos

Ahora vamos a comparar el rendimiento de la adición de documentos síncrona frente a la asíncrona. Estas funciones nos ayudarán a medir cuánto más rápidas pueden ser las operaciones asíncronas al añadir múltiples documentos al almacén vectorial. La versión asíncrona crea tareas para cada adición de documento y las ejecuta concurrentemente, mientras que la versión síncrona procesa los documentos uno a uno:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Ahora vamos a ejecutar nuestras pruebas de rendimiento con diferentes recuentos de documentos para ver las diferencias de rendimiento en el mundo real. Probaremos con distintas cargas para comprender cómo se escalan las operaciones asíncronas en comparación con sus homólogas síncronas. Las pruebas medirán el tiempo de ejecución de ambos enfoques y ayudarán a demostrar las ventajas de rendimiento de las operaciones asíncronas:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Para comparar el rendimiento de la búsqueda, primero tendremos que rellenar el almacén de vectores. Las siguientes funciones nos ayudarán a medir el rendimiento de la búsqueda creando múltiples consultas de búsqueda concurrentes y comparando el tiempo de ejecución entre los enfoques síncrono y asíncrono:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Ahora vamos a ejecutar pruebas exhaustivas de rendimiento de búsqueda para ver cómo se escalan las operaciones asíncronas en comparación con las síncronas. Realizaremos pruebas con distintos volúmenes de consultas para demostrar las ventajas de rendimiento de las operaciones asíncronas, especialmente a medida que aumenta el número de operaciones simultáneas:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Async vs Sync: Borrar

Las operaciones de borrado son otro aspecto crítico en el que las operaciones asíncronas pueden proporcionar importantes mejoras de rendimiento. Vamos a crear funciones para medir la diferencia de rendimiento entre las operaciones de borrado síncronas y asíncronas. Estas pruebas ayudarán a demostrar cómo las operaciones asíncronas pueden manejar los borrados por lotes de forma más eficiente:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Ahora vamos a ejecutar las pruebas de rendimiento de borrado para cuantificar la diferencia de rendimiento. Comenzaremos con un almacén vectorial nuevo con datos de prueba y, a continuación, realizaremos operaciones de borrado utilizando enfoques síncronos y asíncronos:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Conclusión

Este tutorial demostró las significativas ventajas de rendimiento de utilizar operaciones asíncronas con LangChain y Milvus. Hemos comparado las versiones síncronas y asíncronas de las operaciones de adición, búsqueda y eliminación, mostrando cómo las operaciones asíncronas pueden proporcionar mejoras sustanciales de velocidad, especialmente para grandes operaciones por lotes.

Principales conclusiones:

  1. Las operaciones asíncronas ofrecen las mayores ventajas cuando se realizan muchas operaciones individuales que pueden ejecutarse en paralelo.
  2. Para cargas de trabajo que generan un mayor rendimiento, la diferencia de rendimiento entre las operaciones sync y async se amplía.
  3. Las operaciones asíncronas aprovechan al máximo la potencia de cálculo de las máquinas.

Cuando cree aplicaciones RAG de producción con LangChain y Milvus, considere el uso de la API async cuando el rendimiento sea una preocupación, especialmente para operaciones concurrentes.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

¿Fue útil esta página?