Milvus
Zilliz
Home
  • Integrationen
  • Home
  • Docs
  • Integrationen

  • Orchestrierung

  • LangChain

  • Asynchrone Suche

Asynchrone Funktionen in der LangChain-Milvus-Integration

Open In Colab GitHub Repository

In diesem Tutorial erfahren Sie, wie Sie asynchrone Funktionen in Langchain-Milvus nutzen können, um hochperformante Anwendungen zu erstellen. Durch den Einsatz von asynchronen Methoden können Sie den Durchsatz und die Reaktionsfähigkeit Ihrer Anwendung erheblich verbessern, insbesondere wenn Sie mit umfangreichen Abfragen arbeiten. Ganz gleich, ob Sie ein Echtzeit-Empfehlungssystem aufbauen, eine semantische Suche in Ihre Anwendung implementieren oder eine RAG-Pipeline (Retrieval-Augmented Generation) erstellen, mit asynchronen Operationen können Sie gleichzeitige Anfragen effizienter bearbeiten. Die leistungsstarke Vektordatenbank Milvus in Kombination mit den leistungsstarken LLM-Abstraktionen von LangChain bietet eine robuste Grundlage für die Entwicklung skalierbarer KI-Anwendungen.

Async-API-Übersicht

langchain-milvus bietet umfassende Unterstützung für asynchrone Operationen, was die Leistung in großen, gleichzeitigen Szenarien erheblich verbessert. Die async-API bietet ein konsistentes Interface-Design mit der sync-API.

Async-Kernfunktionen

Um asynchrone Operationen in langchain-milvus zu verwenden, fügen Sie einfach ein a Präfix zu den Methodennamen hinzu. Dies ermöglicht eine bessere Ressourcennutzung und einen höheren Durchsatz bei der Bearbeitung gleichzeitiger Abrufanforderungen.

OperationstypSync-MethodeAsync-MethodeBeschreibung
Texte hinzufügenadd_texts()aadd_texts()Hinzufügen von Texten zum Vektorspeicher
Dokumente hinzufügenadd_documents()aadd_documents()Hinzufügen von Dokumenten zum Vektorspeicher
Einbettungen hinzufügenadd_embeddings()aadd_embeddings()Vektoren zur Einbettung hinzufügen
Ähnlichkeitssuchesimilarity_search()asimilarity_search()Semantische Suche nach Text
Vektor-Suchesimilarity_search_by_vector()asimilarity_search_by_vector()Semantische Suche nach Vektoren
Suche mit Scoresimilarity_search_with_score()asimilarity_search_with_score()Semantische Suche nach Text und Rückgabe von Ähnlichkeitswerten
Vektorsuche mit Scoresimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Semantische Suche nach Vektoren und Rückgabe von Ähnlichkeitswerten
Diversity-Suchemax_marginal_relevance_search()amax_marginal_relevance_search()MMR-Suche (liefert Ähnliches zurück und optimiert gleichzeitig die Vielfalt)
Vektor-Diversity-Suchemax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()MMR-Suche nach Vektoren
Löschvorgangdelete()adelete()Dokumente löschen
Upsert-Vorgangupsert()aupsert()Upsert (aktualisieren, wenn vorhanden, sonst einfügen) von Dokumenten
Metadaten-Suchesearch_by_metadata()asearch_by_metadata()Abfrage mit Metadatenfilterung
Primärschlüssel abrufenget_pks()aget_pks()Abrufen von Primärschlüsseln nach Ausdruck
Aus Texten erstellenfrom_texts()afrom_texts()Erstellen eines Vektorspeichers aus Texten

Ausführlichere Informationen zu diesen Funktionen finden Sie in der API-Referenz.

Leistungsvorteile

Asynchrone Operationen bieten erhebliche Leistungsverbesserungen bei der Verarbeitung großer Mengen gleichzeitiger Anfragen, besonders geeignet für:

  • Stapelverarbeitung von Dokumenten
  • Such-Szenarien mit hoher Gleichzeitigkeit
  • Produktions-RAG-Anwendungen
  • Import/Export großer Datenmengen

In diesem Tutorial werden wir diese Leistungsvorteile durch detaillierte Vergleiche von synchronen und asynchronen Operationen demonstrieren und Ihnen zeigen, wie Sie asynchrone APIs für eine optimale Leistung in Ihren Anwendungen nutzen können.

Bevor Sie beginnen

Für die Codeschnipsel auf dieser Seite sind die folgenden Abhängigkeiten erforderlich:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Rand des Bildschirms und wählen Sie "Sitzung neu starten" aus dem Dropdown-Menü).

Wir werden OpenAI-Modelle verwenden. Sie sollten den Api-Schlüssel OPENAI_API_KEY als Umgebungsvariable vorbereiten:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Wenn Sie Jupyter Notebook verwenden, müssen Sie diese Codezeile ausführen, bevor Sie den asynchronen Code starten:

import nest_asyncio

nest_asyncio.apply()

Erkundung der asynchronen APIs und Leistungsvergleich

Lassen Sie uns nun tiefer in den Leistungsvergleich zwischen synchronen und asynchronen Operationen mit langchain-milvus eintauchen.

Importieren Sie zunächst die notwendigen Bibliotheken:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Einrichten von Testfunktionen

Erstellen wir Hilfsfunktionen, um Testdaten zu erzeugen:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Initialisieren des Vektorspeichers

Bevor wir unsere Leistungstests durchführen können, müssen wir einen sauberen Milvus-Vektorspeicher einrichten. Diese Funktion stellt sicher, dass wir für jeden Test mit einer neuen Sammlung beginnen, um Störungen durch frühere Daten zu vermeiden:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs. Sync: Dokumente hinzufügen

Vergleichen wir nun die Leistung des synchronen und des asynchronen Hinzufügens von Dokumenten. Mit diesen Funktionen können wir messen, wie viel schneller asynchrone Operationen beim Hinzufügen mehrerer Dokumente zum Vektorspeicher sein können. Die asynchrone Version erstellt Aufgaben für jedes hinzugefügte Dokument und führt sie gleichzeitig aus, während die synchrone Version die Dokumente nacheinander verarbeitet:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Führen wir nun unsere Leistungstests mit verschiedenen Dokumentenzahlen durch, um die Leistungsunterschiede in der Praxis zu sehen. Wir testen mit unterschiedlichen Lasten, um zu verstehen, wie asynchrone Operationen im Vergleich zu ihren synchronen Pendants skalieren. Die Tests messen die Ausführungszeit für beide Ansätze und helfen, die Leistungsvorteile asynchroner Operationen zu demonstrieren:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Für den Vergleich der Suchleistung müssen wir zunächst den Vektorspeicher auffüllen. Mit den folgenden Funktionen können wir die Suchleistung messen, indem wir mehrere gleichzeitige Suchanfragen erstellen und die Ausführungszeit zwischen synchronen und asynchronen Ansätzen vergleichen:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Lassen Sie uns nun umfassende Tests der Suchleistung durchführen, um zu sehen, wie asynchrone Operationen im Vergleich zu synchronen skalieren. Wir testen mit verschiedenen Abfragevolumina, um die Leistungsvorteile asynchroner Operationen zu demonstrieren, insbesondere wenn die Anzahl der gleichzeitigen Operationen steigt:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Async vs. Sync: Löschen

Löschvorgänge sind ein weiterer kritischer Aspekt, bei dem asynchrone Vorgänge erhebliche Leistungsverbesserungen bewirken können. Lassen Sie uns Funktionen erstellen, um den Leistungsunterschied zwischen synchronen und asynchronen Löschvorgängen zu messen. Diese Tests werden zeigen, wie asynchrone Operationen Batch-Löschungen effizienter handhaben können:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Führen wir nun die Leistungstests für das Löschen aus, um den Leistungsunterschied zu quantifizieren. Wir beginnen mit einem frischen Vektorspeicher, der mit Testdaten gefüllt ist, und führen dann Löschvorgänge sowohl mit synchronen als auch mit asynchronen Ansätzen durch:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Schlussfolgerung

Dieses Tutorial hat die signifikanten Leistungsvorteile von asynchronen Operationen mit LangChain und Milvus gezeigt. Wir haben die synchronen und asynchronen Versionen von Add-, Such- und Löschoperationen verglichen und gezeigt, wie asynchrone Operationen erhebliche Geschwindigkeitsverbesserungen bieten können, insbesondere bei großen Batch-Operationen.

Die wichtigsten Erkenntnisse:

  1. Asynchrone Vorgänge bieten den größten Nutzen bei der Durchführung vieler einzelner Vorgänge, die parallel ausgeführt werden können
  2. Bei Arbeitslasten, die einen höheren Durchsatz generieren, vergrößert sich der Leistungsunterschied zwischen synchronen und asynchronen Operationen
  3. Asynchrone Operationen nutzen die Rechenleistung der Maschinen voll aus

Wenn Sie produktive RAG-Anwendungen mit LangChain und Milvus erstellen, sollten Sie die Verwendung der asynchronen API in Betracht ziehen, wenn die Leistung eine Rolle spielt, insbesondere bei gleichzeitigen Operationen.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

War diese Seite hilfreich?