Funzioni asincrone nell'integrazione di LangChain Milvus

Open In Colab GitHub Repository

Questo tutorial spiega come sfruttare le funzioni asincrone in langchain-milvus per costruire applicazioni ad alte prestazioni. Utilizzando i metodi asincroni, è possibile migliorare in modo significativo il throughput e la reattività delle applicazioni, soprattutto quando si ha a che fare con il reperimento di dati su larga scala. Sia che stiate costruendo un sistema di raccomandazione in tempo reale, sia che stiate implementando una ricerca semantica nella vostra applicazione, sia che stiate creando una pipeline RAG (Retrieval-Augmented Generation), le operazioni async possono aiutarvi a gestire in modo più efficiente le richieste simultanee. Il database vettoriale ad alte prestazioni Milvus, combinato con le potenti astrazioni LLM di LangChain, può fornire una solida base per la costruzione di applicazioni AI scalabili.

Panoramica dell'API asincrona

langchain-milvus fornisce un supporto completo per le operazioni asincrone, migliorando significativamente le prestazioni in scenari concomitanti su larga scala. L'API asincrona mantiene un design di interfaccia coerente con l'API di sincronizzazione.

Funzioni asincrone principali

Per utilizzare le operazioni asincrone in langchain-milvus, è sufficiente aggiungere il prefisso a ai nomi dei metodi. Ciò consente un migliore utilizzo delle risorse e un migliore throughput quando si gestiscono richieste di recupero simultanee.

Tipo di operazioneMetodo SyncMetodo asincronoDescrizione del metodo
Aggiungi testiadd_texts()aadd_texts()Aggiungere testi all'archivio vettoriale
Aggiungi documentiadd_documents()aadd_documents()Aggiunge documenti all'archivio vettoriale
Aggiungere incorporazioniadd_embeddings()aadd_embeddings()Aggiungi vettori di incorporamento
Ricerca per similaritàsimilarity_search()asimilarity_search()Ricerca semantica per testo
Ricerca vettorialesimilarity_search_by_vector()asimilarity_search_by_vector()Ricerca semantica per vettore
Ricerca con punteggiosimilarity_search_with_score()asimilarity_search_with_score()Ricerca semantica per testo e restituzione dei punteggi di somiglianza
Ricerca vettoriale con punteggiosimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Ricerca semantica per vettore e restituzione dei punteggi di similarità
Ricerca per diversitàmax_marginal_relevance_search()amax_marginal_relevance_search()Ricerca MMR (restituisce i risultati simili ottimizzando la diversità)
Ricerca vettoriale di diversitàmax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()Ricerca MMR per vettore
Operazione di cancellazionedelete()adelete()Eliminazione di documenti
Operazione Upsertupsert()aupsert()Inserimento (aggiornamento se esistente, altrimenti inserimento) di documenti
Ricerca per metadatisearch_by_metadata()asearch_by_metadata()Query con filtraggio dei metadati
Ottenere le chiavi primarieget_pks()aget_pks()Ottenere le chiavi primarie in base a un'espressione
Creare da testifrom_texts()afrom_texts()Crea un archivio vettoriale da testi

Per informazioni più dettagliate su queste funzioni, consultare la API Reference.

Vantaggi in termini di prestazioni

Le operazioni asincrone offrono miglioramenti significativi delle prestazioni quando si gestiscono grandi volumi di richieste simultanee, in particolare per:

  • Elaborazione di documenti in batch
  • Scenari di ricerca ad alta frequenza
  • Applicazioni RAG di produzione
  • Importazione/esportazione di dati su larga scala

In questa esercitazione dimostreremo questi vantaggi in termini di prestazioni attraverso un confronto dettagliato tra operazioni sincrone e asincrone, mostrandovi come sfruttare le API asincrone per ottenere prestazioni ottimali nelle vostre applicazioni.

Prima di iniziare

Gli snippet di codice presenti in questa pagina richiedono le seguenti dipendenze:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).

Utilizzeremo i modelli OpenAI. È necessario preparare la chiave api OPENAI_API_KEY come variabile d'ambiente:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Se si utilizza Jupyter Notebook, è necessario eseguire questa riga di codice prima di eseguire il codice asincrono:

import nest_asyncio

nest_asyncio.apply()

Esplorazione delle API asincrone e confronto delle prestazioni

Ora approfondiamo il confronto delle prestazioni tra operazioni sincrone e asincrone con langchain-milvus.

Per prima cosa, importare le librerie necessarie:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Impostazione delle funzioni di test

Creiamo delle funzioni ausiliarie per generare i dati di prova:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Inizializzare l'archivio vettoriale

Prima di poter eseguire i nostri test di prestazione, dobbiamo impostare un archivio vettoriale Milvus pulito. Questa funzione ci assicura di iniziare con una collezione nuova per ogni test, eliminando qualsiasi interferenza dai dati precedenti:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync: Aggiungere documenti

Ora confrontiamo le prestazioni dell'aggiunta di documenti sincrona e asincrona. Queste funzioni ci aiuteranno a misurare quanto possano essere più veloci le operazioni asincrone quando si aggiungono più documenti all'archivio vettoriale. La versione asincrona crea task per ogni aggiunta di documenti e li esegue simultaneamente, mentre la versione sincrona elabora i documenti uno alla volta:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Ora eseguiamo i nostri test sulle prestazioni con diversi numeri di documenti per vedere le differenze di prestazioni nel mondo reale. Eseguiremo i test con carichi diversi per capire come le operazioni asincrone scalano rispetto alle loro controparti sincrone. I test misureranno il tempo di esecuzione per entrambi gli approcci e contribuiranno a dimostrare i vantaggi in termini di prestazioni delle operazioni asincrone:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Per il confronto delle prestazioni di ricerca, dobbiamo prima popolare l'archivio vettoriale. Le funzioni seguenti ci aiuteranno a misurare le prestazioni della ricerca, creando più query di ricerca simultanee e confrontando il tempo di esecuzione tra l'approccio sincrono e quello asincrono:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Eseguiamo ora dei test completi sulle prestazioni di ricerca per vedere come le operazioni asincrone scalano rispetto a quelle sincrone. Eseguiremo test con diversi volumi di query per dimostrare i vantaggi in termini di prestazioni delle operazioni asincrone, soprattutto con l'aumento del numero di operazioni simultanee:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Async vs Sync: Eliminazione

Le operazioni di cancellazione sono un altro aspetto critico in cui le operazioni asincrone possono fornire miglioramenti significativi delle prestazioni. Creiamo delle funzioni per misurare la differenza di prestazioni tra operazioni di cancellazione sincrone e asincrone. Questi test dimostreranno come le operazioni asincrone possano gestire in modo più efficiente le cancellazioni in batch:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Ora eseguiamo i test sulle prestazioni di cancellazione per quantificare la differenza di prestazioni. Inizieremo con un nuovo archivio vettoriale popolato con i dati di prova, quindi eseguiremo le operazioni di cancellazione utilizzando sia l'approccio sincrono che quello asincrono:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Conclusione

Questo tutorial ha dimostrato i significativi vantaggi in termini di prestazioni dell'uso di operazioni asincrone con LangChain e Milvus. Abbiamo confrontato le versioni sincrone e asincrone delle operazioni di aggiunta, ricerca e cancellazione, mostrando come le operazioni asincrone possano fornire miglioramenti sostanziali in termini di velocità, soprattutto per le operazioni batch di grandi dimensioni.

Principali risultati:

  1. Le operazioni asincrone offrono i maggiori vantaggi quando si eseguono molte singole operazioni che possono essere eseguite in parallelo.
  2. Per i carichi di lavoro che generano un throughput più elevato, il divario di prestazioni tra operazioni sync e async aumenta
  3. Le operazioni asincrone sfruttano appieno la potenza di calcolo delle macchine.

Quando si realizzano applicazioni RAG di produzione con LangChain e Milvus, si consiglia di utilizzare l'API async quando le prestazioni sono un problema, soprattutto per le operazioni simultanee.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Questa pagina è stata utile?