Fonctions asynchrones dans l'intégration LangChain-Milvus

Open In Colab GitHub Repository

Ce tutoriel explore comment exploiter les fonctions asynchrones dans langchain-milvus pour construire des applications de haute performance. En utilisant des méthodes asynchrones, vous pouvez améliorer de manière significative le débit et la réactivité de votre application, en particulier lorsqu'il s'agit de récupérer des données à grande échelle. Que vous construisiez un système de recommandation en temps réel, que vous mettiez en œuvre une recherche sémantique dans votre application ou que vous créiez un pipeline RAG (Retrieval-Augmented Generation), les opérations asynchrones peuvent vous aider à traiter plus efficacement les requêtes simultanées. La base de données vectorielle haute performance Milvus, associée aux puissantes abstractions LLM de LangChain, peut constituer une base solide pour la création d'applications d'IA évolutives.

Présentation de l'API Async

langchain-milvus fournit un support complet des opérations asynchrones, améliorant de manière significative les performances dans les scénarios concurrents à grande échelle. L'API asynchrone maintient une interface cohérente avec l'API synchrone.

Fonctions asynchrones de base

Pour utiliser les opérations asynchrones dans langchain-milvus, il suffit d'ajouter un préfixe a aux noms des méthodes. Cela permet une meilleure utilisation des ressources et un débit amélioré lors de la gestion des demandes de récupération concurrentes.

Type d'opérationMéthode synchroneMéthode asynchroneDescription de l'opération
Ajouter des textesadd_texts()aadd_texts()Ajouter des textes à la base de données vectorielles
Ajouter des documentsadd_documents()aadd_documents()Ajouter des documents à la base de données vectorielles
Ajout de vecteurs d'intégrationadd_embeddings()aadd_embeddings()Ajouter des vecteurs d'intégration
Recherche de similaritésimilarity_search()asimilarity_search()Recherche sémantique par texte
Recherche vectoriellesimilarity_search_by_vector()asimilarity_search_by_vector()Recherche sémantique par vecteur
Recherche avec scoresimilarity_search_with_score()asimilarity_search_with_score()Recherche sémantique par texte et retour des scores de similarité
Recherche vectorielle avec scoresimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Recherche sémantique par vecteur et retour des scores de similarité
Recherche de diversitémax_marginal_relevance_search()amax_marginal_relevance_search()Recherche MMR (renvoie les résultats similaires tout en optimisant la diversité)
Recherche vectorielle de diversitémax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()Recherche MMR par vecteur
Opération de suppressiondelete()adelete()Suppression de documents
Opération d'insertionupsert()aupsert()Insérer (mettre à jour si existant, sinon insérer) des documents
Recherche de métadonnéessearch_by_metadata()asearch_by_metadata()Requête avec filtrage des métadonnées
Obtenir des clés primairesget_pks()aget_pks()Obtenir les clés primaires par expression
Créer à partir de textesfrom_texts()afrom_texts()Création d'une base de données vectorielles à partir de textes

Pour plus d'informations sur ces fonctions, veuillez vous référer à la référence API.

Avantages en termes de performances

Les opérations asynchrones permettent d'améliorer considérablement les performances lors du traitement d'un grand nombre de demandes simultanées :

  • le traitement de documents par lots
  • Scénarios de recherche à haute fréquence
  • les applications RAG de production
  • Importation/exportation de données à grande échelle

Dans ce tutoriel, nous allons démontrer ces avantages en termes de performances en comparant en détail les opérations synchrones et asynchrones, et en vous montrant comment exploiter les API asynchrones pour optimiser les performances de vos applications.

Avant de commencer

Les extraits de code de cette page nécessitent les dépendances suivantes :

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).

Nous utiliserons les modèles OpenAI. Vous devez préparer la clé api OPENAI_API_KEY en tant que variable d'environnement :

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Si vous utilisez Jupyter Notebook, vous devez exécuter cette ligne de code avant d'exécuter le code asynchrone :

import nest_asyncio

nest_asyncio.apply()

Exploration des API asynchrones et comparaison des performances

Maintenant, plongeons plus profondément dans la comparaison des performances entre les opérations synchrones et asynchrones avec langchain-milvus.

Tout d'abord, importez les bibliothèques nécessaires :

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Mise en place des fonctions de test

Créons des fonctions d'aide pour générer des données de test :

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Initialiser le magasin de vecteurs

Avant de pouvoir exécuter nos tests de performance, nous devons configurer un magasin de vecteurs Milvus propre. Cette fonction garantit que nous commençons avec une nouvelle collection pour chaque test, en éliminant toute interférence des données précédentes :

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync : Ajouter des documents

Comparons maintenant les performances de l'ajout de documents synchrone et asynchrone. Ces fonctions nous aideront à mesurer la rapidité des opérations asynchrones lors de l'ajout de plusieurs documents au magasin vectoriel. La version asynchrone crée des tâches pour chaque ajout de document et les exécute simultanément, tandis que la version synchrone traite les documents un par un :

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Exécutons maintenant nos tests de performance avec différents nombres de documents pour voir les différences de performance dans le monde réel. Nous allons effectuer des tests avec différentes charges pour comprendre comment les opérations asynchrones évoluent par rapport à leurs homologues synchrones. Les tests mesureront le temps d'exécution pour les deux approches et aideront à démontrer les avantages des opérations asynchrones en termes de performances :

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Pour la comparaison des performances de recherche, nous devons d'abord remplir le magasin de vecteurs. Les fonctions suivantes nous aideront à mesurer les performances de recherche en créant plusieurs requêtes de recherche simultanées et en comparant le temps d'exécution entre les approches synchrones et asynchrones :

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Exécutons maintenant des tests complets de performance de recherche pour voir comment les opérations asynchrones évoluent par rapport aux opérations synchrones. Nous testerons différents volumes de requêtes pour démontrer les avantages des opérations asynchrones en termes de performances, en particulier lorsque le nombre d'opérations simultanées augmente :

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Async vs Sync : Suppression

Les opérations de suppression sont un autre aspect critique pour lequel les opérations asynchrones peuvent apporter des améliorations significatives en termes de performances. Créons des fonctions pour mesurer la différence de performance entre les opérations de suppression synchrones et asynchrones. Ces tests permettront de démontrer comment les opérations asynchrones peuvent gérer les suppressions par lots de manière plus efficace :

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Exécutons maintenant les tests de performance de suppression pour quantifier la différence de performance. Nous commencerons par un magasin vectoriel frais rempli de données de test, puis nous effectuerons des opérations de suppression en utilisant les approches synchrone et asynchrone :

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Conclusion

Ce tutoriel a démontré les avantages significatifs en termes de performances de l'utilisation d'opérations asynchrones avec LangChain et Milvus. Nous avons comparé les versions synchrones et asynchrones des opérations d'ajout, de recherche et de suppression, en montrant comment les opérations asynchrones peuvent apporter des améliorations substantielles en termes de vitesse, en particulier pour les opérations par lots importantes.

Principaux enseignements :

  1. Les opérations asynchrones offrent le plus d'avantages lorsqu'il s'agit d'effectuer de nombreuses opérations individuelles qui peuvent être exécutées en parallèle
  2. Pour les charges de travail qui génèrent un débit plus élevé, l'écart de performance entre les opérations synchrone et asynchrone se creuse.
  3. Les opérations asynchrones utilisent pleinement la puissance de calcul des machines.

Lorsque vous créez des applications RAG de production avec LangChain et Milvus, envisagez d'utiliser l'API asynchrone lorsque les performances sont un problème, en particulier pour les opérations simultanées.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Cette page a-t - elle été utile ?