Funzioni asincrone nell'integrazione di LangChain Milvus
Questo tutorial spiega come sfruttare le funzioni asincrone in langchain-milvus per costruire applicazioni ad alte prestazioni. Utilizzando i metodi asincroni, è possibile migliorare in modo significativo il throughput e la reattività delle applicazioni, soprattutto quando si ha a che fare con il reperimento di dati su larga scala. Sia che stiate costruendo un sistema di raccomandazione in tempo reale, sia che stiate implementando una ricerca semantica nella vostra applicazione, sia che stiate creando una pipeline RAG (Retrieval-Augmented Generation), le operazioni async possono aiutarvi a gestire in modo più efficiente le richieste simultanee. Il database vettoriale ad alte prestazioni Milvus, combinato con le potenti astrazioni LLM di LangChain, può fornire una solida base per la costruzione di applicazioni AI scalabili.
Panoramica dell'API asincrona
langchain-milvus fornisce un supporto completo per le operazioni asincrone, migliorando significativamente le prestazioni in scenari concomitanti su larga scala. L'API asincrona mantiene un design di interfaccia coerente con l'API di sincronizzazione.
Funzioni asincrone principali
Per utilizzare le operazioni asincrone in langchain-milvus, è sufficiente aggiungere il prefisso a ai nomi dei metodi. Ciò consente un migliore utilizzo delle risorse e un migliore throughput quando si gestiscono richieste di recupero simultanee.
| Tipo di operazione | Metodo Sync | Metodo asincrono | Descrizione del metodo |
|---|---|---|---|
| Aggiungi testi | add_texts() | aadd_texts() | Aggiungere testi all'archivio vettoriale |
| Aggiungi documenti | add_documents() | aadd_documents() | Aggiunge documenti all'archivio vettoriale |
| Aggiungere incorporazioni | add_embeddings() | aadd_embeddings() | Aggiungi vettori di incorporamento |
| Ricerca per similarità | similarity_search() | asimilarity_search() | Ricerca semantica per testo |
| Ricerca vettoriale | similarity_search_by_vector() | asimilarity_search_by_vector() | Ricerca semantica per vettore |
| Ricerca con punteggio | similarity_search_with_score() | asimilarity_search_with_score() | Ricerca semantica per testo e restituzione dei punteggi di somiglianza |
| Ricerca vettoriale con punteggio | similarity_search_with_score_by_vector() | asimilarity_search_with_score_by_vector() | Ricerca semantica per vettore e restituzione dei punteggi di similarità |
| Ricerca per diversità | max_marginal_relevance_search() | amax_marginal_relevance_search() | Ricerca MMR (restituisce i risultati simili ottimizzando la diversità) |
| Ricerca vettoriale di diversità | max_marginal_relevance_search_by_vector() | amax_marginal_relevance_search_by_vector() | Ricerca MMR per vettore |
| Operazione di cancellazione | delete() | adelete() | Eliminazione di documenti |
| Operazione Upsert | upsert() | aupsert() | Inserimento (aggiornamento se esistente, altrimenti inserimento) di documenti |
| Ricerca per metadati | search_by_metadata() | asearch_by_metadata() | Query con filtraggio dei metadati |
| Ottenere le chiavi primarie | get_pks() | aget_pks() | Ottenere le chiavi primarie in base a un'espressione |
| Creare da testi | from_texts() | afrom_texts() | Crea un archivio vettoriale da testi |
Per informazioni più dettagliate su queste funzioni, consultare la API Reference.
Vantaggi in termini di prestazioni
Le operazioni asincrone offrono miglioramenti significativi delle prestazioni quando si gestiscono grandi volumi di richieste simultanee, in particolare per:
- Elaborazione di documenti in batch
- Scenari di ricerca ad alta frequenza
- Applicazioni RAG di produzione
- Importazione/esportazione di dati su larga scala
In questa esercitazione dimostreremo questi vantaggi in termini di prestazioni attraverso un confronto dettagliato tra operazioni sincrone e asincrone, mostrandovi come sfruttare le API asincrone per ottenere prestazioni ottimali nelle vostre applicazioni.
Prima di iniziare
Gli snippet di codice presenti in questa pagina richiedono le seguenti dipendenze:
! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio
Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).
Utilizzeremo i modelli OpenAI. È necessario preparare la chiave api OPENAI_API_KEY come variabile d'ambiente:
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Se si utilizza Jupyter Notebook, è necessario eseguire questa riga di codice prima di eseguire il codice asincrono:
import nest_asyncio
nest_asyncio.apply()
Esplorazione delle API asincrone e confronto delle prestazioni
Ora approfondiamo il confronto delle prestazioni tra operazioni sincrone e asincrone con langchain-milvus.
Per prima cosa, importare le librerie necessarie:
import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus
# Define the Milvus URI
URI = "http://localhost:19530"
Impostazione delle funzioni di test
Creiamo delle funzioni ausiliarie per generare i dati di prova:
def random_id():
"""Generate a random string ID"""
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def generate_test_documents(num_docs):
"""Generate test documents for performance testing"""
docs = []
for i in range(num_docs):
content = (
f"This is test document {i} with some random content: {random.random()}"
)
metadata = {
"id": f"doc_{i}",
"score": random.random(),
"category": f"cat_{i % 5}",
}
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
Inizializzare l'archivio vettoriale
Prima di poter eseguire i nostri test di prestazione, dobbiamo impostare un archivio vettoriale Milvus pulito. Questa funzione ci assicura di iniziare con una collezione nuova per ogni test, eliminando qualsiasi interferenza dai dati precedenti:
def init_vector_store():
"""Initialize and return a fresh vector store for testing"""
return Milvus(
embedding_function=OpenAIEmbeddings(),
collection_name="langchain_perf_test",
connection_args={"uri": URI},
auto_id=True,
drop_old=True, # Always start with a fresh collection
)
Async vs Sync: Aggiungere documenti
Ora confrontiamo le prestazioni dell'aggiunta di documenti sincrona e asincrona. Queste funzioni ci aiuteranno a misurare quanto possano essere più veloci le operazioni asincrone quando si aggiungono più documenti all'archivio vettoriale. La versione asincrona crea task per ogni aggiunta di documenti e li esegue simultaneamente, mentre la versione sincrona elabora i documenti uno alla volta:
async def async_add(milvus_store, num_adding):
"""Add documents asynchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
tasks = []
for doc in docs:
# Create tasks for each document addition
task = milvus_store.aadd_documents([doc])
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_add(milvus_store, num_adding):
"""Add documents synchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
for doc in docs:
result = milvus_store.add_documents([doc])
end_time = time.time()
return end_time - start_time
Ora eseguiamo i nostri test sulle prestazioni con diversi numeri di documenti per vedere le differenze di prestazioni nel mondo reale. Eseguiremo i test con carichi diversi per capire come le operazioni asincrone scalano rispetto alle loro controparti sincrone. I test misureranno il tempo di esecuzione per entrambi gli approcci e contribuiranno a dimostrare i vantaggi in termini di prestazioni delle operazioni asincrone:
add_counts = [10, 100]
# Get the event loop
loop = asyncio.get_event_loop()
# Create a new vector store for testing
milvus_store = init_vector_store()
# Test async document addition
for count in add_counts:
async def measure_async_add():
async_time = await async_add(milvus_store, count)
print(f"Async add for {count} documents took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
# Reset vector store for sync tests
milvus_store = init_vector_store()
# Test sync document addition
for count in add_counts:
sync_time = sync_add(milvus_store, count)
print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)
Async add for 10 documents took 1.74 seconds
2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)
Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds
Async vs Sync: Ricerca
Per il confronto delle prestazioni di ricerca, dobbiamo prima popolare l'archivio vettoriale. Le funzioni seguenti ci aiuteranno a misurare le prestazioni della ricerca, creando più query di ricerca simultanee e confrontando il tempo di esecuzione tra l'approccio sincrono e quello asincrono:
def populate_vector_store(milvus_store, num_docs=1000):
"""Populate the vector store with test documents"""
docs = generate_test_documents(num_docs)
milvus_store.add_documents(docs)
return docs
async def async_search(milvus_store, num_queries):
"""Perform async searches and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_queries):
query = f"test document {i % 50}"
task = milvus_store.asimilarity_search(query=query, k=3)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_search(milvus_store, num_queries):
"""Perform sync searches and measure the time"""
start_time = time.time()
for i in range(num_queries):
query = f"test document {i % 50}"
result = milvus_store.similarity_search(query=query, k=3)
end_time = time.time()
return end_time - start_time
Eseguiamo ora dei test completi sulle prestazioni di ricerca per vedere come le operazioni asincrone scalano rispetto a quelle sincrone. Eseguiremo test con diversi volumi di query per dimostrare i vantaggi in termini di prestazioni delle operazioni asincrone, soprattutto con l'aumento del numero di operazioni simultanee:
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
query_counts = [10, 100]
# Test async search
for count in query_counts:
async def measure_async_search():
async_time = await async_search(milvus_store, count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
# Test sync search
for count in query_counts:
sync_time = sync_search(milvus_store, count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)
Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds
Async vs Sync: Eliminazione
Le operazioni di cancellazione sono un altro aspetto critico in cui le operazioni asincrone possono fornire miglioramenti significativi delle prestazioni. Creiamo delle funzioni per misurare la differenza di prestazioni tra operazioni di cancellazione sincrone e asincrone. Questi test dimostreranno come le operazioni asincrone possano gestire in modo più efficiente le cancellazioni in batch:
async def async_delete(milvus_store, num_deleting):
"""Delete documents asynchronously and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
task = milvus_store.adelete(expr=expr)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_delete(milvus_store, num_deleting):
"""Delete documents synchronously and measure the time"""
start_time = time.time()
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
result = milvus_store.delete(expr=expr)
end_time = time.time()
return end_time - start_time
Ora eseguiamo i test sulle prestazioni di cancellazione per quantificare la differenza di prestazioni. Inizieremo con un nuovo archivio vettoriale popolato con i dati di prova, quindi eseguiremo le operazioni di cancellazione utilizzando sia l'approccio sincrono che quello asincrono:
delete_counts = [10, 100]
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test async delete
for count in delete_counts:
async def measure_async_delete():
async_time = await async_delete(milvus_store, count)
print(f"Async delete for {count} operations took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_delete())
# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test sync delete
for count in delete_counts:
sync_time = sync_delete(milvus_store, count)
print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)
Async delete for 10 operations took 0.58 seconds
2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)
Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds
Conclusione
Questo tutorial ha dimostrato i significativi vantaggi in termini di prestazioni dell'uso di operazioni asincrone con LangChain e Milvus. Abbiamo confrontato le versioni sincrone e asincrone delle operazioni di aggiunta, ricerca e cancellazione, mostrando come le operazioni asincrone possano fornire miglioramenti sostanziali in termini di velocità, soprattutto per le operazioni batch di grandi dimensioni.
Principali risultati:
- Le operazioni asincrone offrono i maggiori vantaggi quando si eseguono molte singole operazioni che possono essere eseguite in parallelo.
- Per i carichi di lavoro che generano un throughput più elevato, il divario di prestazioni tra operazioni sync e async aumenta
- Le operazioni asincrone sfruttano appieno la potenza di calcolo delle macchine.
Quando si realizzano applicazioni RAG di produzione con LangChain e Milvus, si consiglia di utilizzare l'API async quando le prestazioni sono un problema, soprattutto per le operazioni simultanee.