LangChain Milvus 整合中的異步函數

Open In Colab GitHub Repository

本教學探討如何利用langchain-milvus中的非同步函數來建立高效能的應用程式。透過使用 async 方法,您可以大幅提升應用程式的吞吐量與回應能力,尤其是在處理大規模擷取時。無論您是要建立即時推薦系統、在應用程式中實作語意搜尋,或是建立 RAG(Retrieval-Augmented Generation)管道,async 作業都能幫助您更有效率地處理並發請求。高效能向量資料庫 Milvus 結合 LangChain 強大的 LLM 抽象,可為建立可擴充的 AI 應用程式提供穩健的基礎。

異步 API 概觀

langchain-milvus 提供全面的異步操作支援,大幅提升大規模並發情境的效能。async API 與 sync API 維持一致的介面設計。

核心異步函式

要在 langchain-milvus 中使用 async 操作,只需在方法名稱中加入a 前綴。這可在處理並發檢索請求時,提高資源利用率並改善吞吐量。

作業類型同步方法同步方法說明
新增文字add_texts()aadd_texts()新增文字到向量儲存
新增文件add_documents()aadd_documents()新增文件到向量儲存庫
新增嵌入向量add_embeddings()aadd_embeddings()新增嵌入向量
相似性搜尋similarity_search()asimilarity_search()依文字進行語意搜尋
向量搜尋similarity_search_by_vector()asimilarity_search_by_vector()透過向量進行語意搜尋
使用分數搜尋similarity_search_with_score()asimilarity_search_with_score()透過文字進行語意搜尋並傳回相似度得分
向量搜尋similarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()透過向量進行語意搜尋,並傳回相似度得分
多樣性搜尋max_marginal_relevance_search()amax_marginal_relevance_search()MMR 搜尋 (回傳相似度,同時優化多樣性)
向量多樣性搜尋max_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()以向量進行 MMR 搜尋
刪除作業delete()adelete()刪除文件
插入操作upsert()aupsert()倒插(若已有則更新,否則插入)文件
元資料搜尋search_by_metadata()asearch_by_metadata()使用元資料過濾查詢
取得主索引鍵get_pks()aget_pks()透過表達式取得主索引鍵
從文字建立from_texts()afrom_texts()從文字建立向量儲存

有關這些函式的詳細資訊,請參閱API Reference

效能優勢

當處理大量的並發要求時,Async 作業可提供顯著的效能改善,尤其適用於下列情況

  • 批次文件處理
  • 高併發搜尋情況
  • 生產 RAG 應用程式
  • 大規模資料匯入/匯出

在本教程中,我們將透過同步與非同步作業的詳細比較來展示這些效能優點,告訴您如何在應用程式中利用非同步 API 來獲得最佳效能。

開始之前

本頁面的程式碼片段需要下列依賴項目:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

如果您使用的是 Google Colab,要啟用剛安裝的相依性,您可能需要重新啟動執行時(點選畫面頂端的「Runtime」功能表,並從下拉式功能表中選擇「Restart session」)。

我們將使用 OpenAI 模型。您應該準備api key OPENAI_API_KEY 作為環境變數:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

如果您使用的是 Jupyter Notebook,您需要在執行異步程式碼之前先執行這一行程式碼:

import nest_asyncio

nest_asyncio.apply()

探索異步 API 與效能比較

現在讓我們利用 langchain-milvus 深入了解同步與非同步操作的效能比較。

首先,匯入必要的函式庫:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

設定測試函式

讓我們建立輔助函式來產生測試資料:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

初始化向量儲存

在執行效能測試之前,我們需要建立一個乾淨的 Milvus 向量儲存空間。這個函式確保我們每次測試都從新的集合開始,消除先前資料的任何干擾:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync:新增文件

現在讓我們比較同步與非同步新增文件的效能。這些函式將有助於我們衡量在向量儲存中加入多個文件時,非同步操作的速度有多快。非同步版本會為每個新增的文件建立任務並同時執行,而同步版本則會逐一處理文件:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

現在讓我們以不同的文件數量執行效能測試,看看實際效能的差異。我們將使用不同的負載進行測試,以瞭解非同步作業與同步作業相比如何擴充。測試將測量兩種方法的執行時間,並有助於展示異步作業的效能優勢:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

為了進行搜尋效能比較,我們需要先填充向量儲存。透過建立多個並發搜尋查詢,並比較同步與非同步方式的執行時間,下列函式將有助於我們衡量搜尋效能:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

現在讓我們執行全面的搜尋效能測試,看看非同步作業與同步作業相比的規模。我們將以不同的查詢量進行測試,以展示異步作業的效能優勢,尤其是當同時作業的數量增加時:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

異步 vs 同步:刪除

刪除作業是異步作業可以提供顯著效能改善的另一個關鍵方面。讓我們建立函式來測量同步與非同步刪除作業的效能差異。這些測試將有助於展示非同步作業如何更有效率地處理批次刪除:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

現在讓我們執行刪除效能測試,以量化效能差異。我們將從一個填入測試資料的新向量儲存開始,然後同時使用同步和非同步方式執行刪除作業:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

結論

本教程展示了在 LangChain 和 Milvus 中使用異步操作的顯著性能優勢。我們比較了同步和非同步版本的新增、搜尋和刪除作業,展示了非同步作業如何大幅提升速度,尤其是對於大量的批次作業。

主要心得:

  1. 當執行許多可平行執行的個別作業時,非同步作業可帶來最大的效益
  2. 對於產生較高吞吐量的工作負載,同步作業與非同步作業之間的效能差距會擴大
  3. 同步作業可充分利用機器的運算能力

當使用 LangChain 和 Milvus 建立生產 RAG 應用程式時,當關注效能,尤其是並發作業時,請考慮使用 async API。

免費嘗試托管的 Milvus

Zilliz Cloud 無縫接入,由 Milvus 提供動力,速度提升 10 倍。

開始使用
反饋

這個頁面有幫助嗎?