Funciones asíncronas en la integración de LangChain Milvus
Este tutorial explora cómo aprovechar las funciones asíncronas en langchain-milvus para construir aplicaciones de alto rendimiento. Mediante el uso de métodos asíncronos, puede mejorar significativamente el rendimiento y la capacidad de respuesta de su aplicación, especialmente cuando se trata de recuperación a gran escala. Tanto si está creando un sistema de recomendación en tiempo real, implementando la búsqueda semántica en su aplicación o creando una canalización RAG (Retrieval-Augmented Generation), las operaciones asíncronas pueden ayudarle a gestionar las solicitudes concurrentes de forma más eficiente. La base de datos vectorial de alto rendimiento Milvus combinada con las potentes abstracciones LLM de LangChain pueden proporcionar una base sólida para crear aplicaciones de IA escalables.
Visión general de la API asíncrona
langchain-milvus proporciona un completo soporte de operaciones asíncronas, mejorando significativamente el rendimiento en escenarios concurrentes a gran escala. La API asíncrona mantiene un diseño de interfaz coherente con la API síncrona.
Funciones básicas asíncronas
Para utilizar operaciones asíncronas en langchain-milvus, basta con añadir un prefijo a a los nombres de los métodos. Esto permite una mejor utilización de los recursos y un mayor rendimiento cuando se gestionan solicitudes de recuperación simultáneas.
| Tipo de operación | Método Sync | Método asíncrono | Descripción |
|---|---|---|---|
| Añadir textos | add_texts() | aadd_texts() | Añadir textos al almacén vectorial |
| Añadir documentos | add_documents() | aadd_documents() | Añadir documentos al almacén vectorial |
| Añadir incrustaciones | add_embeddings() | aadd_embeddings() | Añadir vectores de incrustación |
| Búsqueda por similitud | similarity_search() | asimilarity_search() | Búsqueda semántica por texto |
| Búsqueda vectorial | similarity_search_by_vector() | asimilarity_search_by_vector() | Búsqueda semántica por vector |
| Búsqueda con puntuación | similarity_search_with_score() | asimilarity_search_with_score() | Búsqueda semántica por texto y devolución de puntuaciones de similitud |
| Búsqueda vectorial con puntuación | similarity_search_with_score_by_vector() | asimilarity_search_with_score_by_vector() | Búsqueda semántica por vector y devolución de puntuaciones de similitud |
| Búsqueda por diversidad | max_marginal_relevance_search() | amax_marginal_relevance_search() | Búsqueda MMR (devuelve similares a la vez que optimiza la diversidad) |
| Búsqueda por diversidad vectorial | max_marginal_relevance_search_by_vector() | amax_marginal_relevance_search_by_vector() | Búsqueda MMR por vector |
| Operación de supresión | delete() | adelete() | Borrar documentos |
| Operación de inserción | upsert() | aupsert() | Reinsertar (actualizar si existe, insertar en caso contrario) documentos |
| Búsqueda de metadatos | search_by_metadata() | asearch_by_metadata() | Consulta con filtrado de metadatos |
| Obtener claves primarias | get_pks() | aget_pks() | Obtención de claves primarias por expresión |
| Crear a partir de textos | from_texts() | afrom_texts() | Crear almacén vectorial a partir de textos |
Para obtener información más detallada sobre estas funciones, consulte la Referencia de la API.
Ventajas de rendimiento
Las operaciones asíncronas proporcionan importantes mejoras de rendimiento cuando se manejan grandes volúmenes de solicitudes concurrentes, especialmente adecuadas para:
- Procesamiento de documentos por lotes
- Escenarios de búsqueda de alta concurrencia
- Aplicaciones RAG de producción
- Importación y exportación de datos a gran escala
En este tutorial, demostraremos estas ventajas de rendimiento mediante comparaciones detalladas de operaciones síncronas y asíncronas, mostrándole cómo aprovechar las API asíncronas para obtener un rendimiento óptimo en sus aplicaciones.
Antes de empezar
Los fragmentos de código de esta página requieren las siguientes dependencias:
! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio
Si utilizas Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla y selecciona "Reiniciar sesión" en el menú desplegable).
Utilizaremos modelos OpenAI. Debes preparar la clave api OPENAI_API_KEY como variable de entorno:
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Si utilizas Jupyter Notebook, deberás ejecutar esta línea de código antes de ejecutar el código asíncrono:
import nest_asyncio
nest_asyncio.apply()
Exploración de las API asíncronas y comparación del rendimiento
Ahora vamos a profundizar en la comparación de rendimiento entre operaciones síncronas y asíncronas con langchain-milvus.
En primer lugar, importa las bibliotecas necesarias:
import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus
# Define the Milvus URI
URI = "http://localhost:19530"
Configuración de funciones de prueba
Vamos a crear funciones de ayuda para generar datos de prueba:
def random_id():
"""Generate a random string ID"""
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def generate_test_documents(num_docs):
"""Generate test documents for performance testing"""
docs = []
for i in range(num_docs):
content = (
f"This is test document {i} with some random content: {random.random()}"
)
metadata = {
"id": f"doc_{i}",
"score": random.random(),
"category": f"cat_{i % 5}",
}
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
Inicializar el almacén de vectores
Antes de que podamos ejecutar nuestras pruebas de rendimiento, tenemos que configurar un almacén vectorial Milvus limpio. Esta función asegura que comencemos con una colección fresca para cada prueba, eliminando cualquier interferencia de datos anteriores:
def init_vector_store():
"""Initialize and return a fresh vector store for testing"""
return Milvus(
embedding_function=OpenAIEmbeddings(),
collection_name="langchain_perf_test",
connection_args={"uri": URI},
auto_id=True,
drop_old=True, # Always start with a fresh collection
)
Async vs Sync: Añadir documentos
Ahora vamos a comparar el rendimiento de la adición de documentos síncrona frente a la asíncrona. Estas funciones nos ayudarán a medir cuánto más rápidas pueden ser las operaciones asíncronas al añadir múltiples documentos al almacén vectorial. La versión asíncrona crea tareas para cada adición de documento y las ejecuta concurrentemente, mientras que la versión síncrona procesa los documentos uno a uno:
async def async_add(milvus_store, num_adding):
"""Add documents asynchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
tasks = []
for doc in docs:
# Create tasks for each document addition
task = milvus_store.aadd_documents([doc])
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_add(milvus_store, num_adding):
"""Add documents synchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
for doc in docs:
result = milvus_store.add_documents([doc])
end_time = time.time()
return end_time - start_time
Ahora vamos a ejecutar nuestras pruebas de rendimiento con diferentes recuentos de documentos para ver las diferencias de rendimiento en el mundo real. Probaremos con distintas cargas para comprender cómo se escalan las operaciones asíncronas en comparación con sus homólogas síncronas. Las pruebas medirán el tiempo de ejecución de ambos enfoques y ayudarán a demostrar las ventajas de rendimiento de las operaciones asíncronas:
add_counts = [10, 100]
# Get the event loop
loop = asyncio.get_event_loop()
# Create a new vector store for testing
milvus_store = init_vector_store()
# Test async document addition
for count in add_counts:
async def measure_async_add():
async_time = await async_add(milvus_store, count)
print(f"Async add for {count} documents took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
# Reset vector store for sync tests
milvus_store = init_vector_store()
# Test sync document addition
for count in add_counts:
sync_time = sync_add(milvus_store, count)
print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)
Async add for 10 documents took 1.74 seconds
2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)
Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds
Async vs Sync: Búsqueda
Para comparar el rendimiento de la búsqueda, primero tendremos que rellenar el almacén de vectores. Las siguientes funciones nos ayudarán a medir el rendimiento de la búsqueda creando múltiples consultas de búsqueda concurrentes y comparando el tiempo de ejecución entre los enfoques síncrono y asíncrono:
def populate_vector_store(milvus_store, num_docs=1000):
"""Populate the vector store with test documents"""
docs = generate_test_documents(num_docs)
milvus_store.add_documents(docs)
return docs
async def async_search(milvus_store, num_queries):
"""Perform async searches and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_queries):
query = f"test document {i % 50}"
task = milvus_store.asimilarity_search(query=query, k=3)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_search(milvus_store, num_queries):
"""Perform sync searches and measure the time"""
start_time = time.time()
for i in range(num_queries):
query = f"test document {i % 50}"
result = milvus_store.similarity_search(query=query, k=3)
end_time = time.time()
return end_time - start_time
Ahora vamos a ejecutar pruebas exhaustivas de rendimiento de búsqueda para ver cómo se escalan las operaciones asíncronas en comparación con las síncronas. Realizaremos pruebas con distintos volúmenes de consultas para demostrar las ventajas de rendimiento de las operaciones asíncronas, especialmente a medida que aumenta el número de operaciones simultáneas:
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
query_counts = [10, 100]
# Test async search
for count in query_counts:
async def measure_async_search():
async_time = await async_search(milvus_store, count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
# Test sync search
for count in query_counts:
sync_time = sync_search(milvus_store, count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)
Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds
Async vs Sync: Borrar
Las operaciones de borrado son otro aspecto crítico en el que las operaciones asíncronas pueden proporcionar importantes mejoras de rendimiento. Vamos a crear funciones para medir la diferencia de rendimiento entre las operaciones de borrado síncronas y asíncronas. Estas pruebas ayudarán a demostrar cómo las operaciones asíncronas pueden manejar los borrados por lotes de forma más eficiente:
async def async_delete(milvus_store, num_deleting):
"""Delete documents asynchronously and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
task = milvus_store.adelete(expr=expr)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_delete(milvus_store, num_deleting):
"""Delete documents synchronously and measure the time"""
start_time = time.time()
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
result = milvus_store.delete(expr=expr)
end_time = time.time()
return end_time - start_time
Ahora vamos a ejecutar las pruebas de rendimiento de borrado para cuantificar la diferencia de rendimiento. Comenzaremos con un almacén vectorial nuevo con datos de prueba y, a continuación, realizaremos operaciones de borrado utilizando enfoques síncronos y asíncronos:
delete_counts = [10, 100]
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test async delete
for count in delete_counts:
async def measure_async_delete():
async_time = await async_delete(milvus_store, count)
print(f"Async delete for {count} operations took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_delete())
# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test sync delete
for count in delete_counts:
sync_time = sync_delete(milvus_store, count)
print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)
Async delete for 10 operations took 0.58 seconds
2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)
Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds
Conclusión
Este tutorial demostró las significativas ventajas de rendimiento de utilizar operaciones asíncronas con LangChain y Milvus. Hemos comparado las versiones síncronas y asíncronas de las operaciones de adición, búsqueda y eliminación, mostrando cómo las operaciones asíncronas pueden proporcionar mejoras sustanciales de velocidad, especialmente para grandes operaciones por lotes.
Principales conclusiones:
- Las operaciones asíncronas ofrecen las mayores ventajas cuando se realizan muchas operaciones individuales que pueden ejecutarse en paralelo.
- Para cargas de trabajo que generan un mayor rendimiento, la diferencia de rendimiento entre las operaciones sync y async se amplía.
- Las operaciones asíncronas aprovechan al máximo la potencia de cálculo de las máquinas.
Cuando cree aplicaciones RAG de producción con LangChain y Milvus, considere el uso de la API async cuando el rendimiento sea una preocupación, especialmente para operaciones concurrentes.