使用 Milvus 和 LlamaIndex Async API 的 RAG

Open In Colab GitHub Repository

本教學示範如何使用LlamaIndexMilvus為 RAG 建立異步文件處理管道。LlamaIndex 提供了一種處理文件並儲存於向量資料庫的方式,就像 Milvus 一樣。透過利用 LlamaIndex 的 async API 和 Milvus Python 客戶端函式庫,我們可以提高管道的吞吐量,以有效率地處理大量資料並編製索引。

在本教程中,我們將先從高層次介紹如何使用異步方法來利用 LlamaIndex 和 Milvus 建立 RAG,然後再介紹低層次方法的使用,以及同步和異步的效能比較。

在您開始之前

本頁面的程式碼片段需要 pymilvus 和 llamaindex 的相依性。您可以使用下列指令安裝它們:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

如果您使用的是 Google Colab,為了啟用剛安裝的相依性,您可能需要重新啟動執行時(點選畫面頂端的「Runtime」功能表,並從下拉式功能表中選擇「Restart session」)。

我們將使用 OpenAI 的模型。您應該準備api key OPENAI_API_KEY 作為環境變數。

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

如果您使用的是 Jupyter Notebook,您需要在執行異步程式碼之前先執行這一行程式碼。

import nest_asyncio

nest_asyncio.apply()

準備資料

您可以使用下列指令下載範例資料:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

使用異步處理建立 RAG

本節將介紹如何建立一個可以以異步方式處理文件的 RAG 系統。

導入必要的函式庫,並定義 Milvus URI 和嵌入的尺寸。

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • 如果您有大規模的資料,您可以在docker 或 kubernetes 上架設效能優異的 Milvus 伺服器。在此設定中,請使用伺服器的 URI,例如http://localhost:19530 ,作為您的uri
  • 如果您想使用Zilliz Cloud(Milvus 的完全管理雲端服務),請調整uritoken ,與 Zilliz Cloud 中的Public Endpoint 和 Api key對應。
  • 在複雜的系統中 (例如網路通訊),相較於同步處理,非同步處理可以帶來效能的提升。因此我們認為 Milvus-Lite 不適合使用異步介面,因為使用的情境並不適合。

定義一個初始化函式,我們可以再次使用它來重建 Milvus 套件。

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

使用 SimpleDirectoryReader 從檔案paul_graham_essay.txt 包裝一個 LlamaIndex 文件物件。

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

在本機實作一個 Hugging Face 嵌入模型。使用本機模型可避免在非同步資料插入時達到 API 速率限制的風險,因為並發的 API 請求很快就會累積起來,耗盡您在公共 API 的預算。但是,如果您有很高的速率限制,您可以選擇使用遠端模型服務來代替。

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

建立索引並插入文件。

我們將use_async 設為True ,以啟用同步插入模式。

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

初始化 LLM。

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

建立查詢引擎時,您也可以將use_async 參數設定為True ,以啟用異步搜尋。

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

探索異步 API

在本節中,我們將介紹較低階的 API 使用方式,並比較同步與非同步執行的效能。

異步新增

重新初始化向量儲存。

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

讓我們定義一個產生節點的函式,用來產生大量的索引測試節點。

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

定義一個 aync 函式來新增文件到向量儲存空間。我們在 Milvus 向量存儲實例中使用async_add() 函式。

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

取得事件迴圈。

loop = asyncio.get_event_loop()

以異步方式新增文件到向量儲存空間。

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

與同步新增比較

定義同步新增函式。然後測量相同條件下的執行時間。

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

結果顯示同步新增的過程比異步的慢很多。

在執行搜尋之前,重新初始化向量儲存並新增一些文件。

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

定義異步搜尋函式。我們使用 Milvus 向量儲存實例中的aquery() 函式。

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

從 Milvus 商店進行非同步搜尋。

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

定義同步搜尋函式。然後測量相同條件下的執行時間。

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

結果顯示同步搜尋的過程比異步搜尋慢很多。

免費嘗試托管的 Milvus

Zilliz Cloud 無縫接入,由 Milvus 提供動力,速度提升 10 倍。

開始使用
反饋

這個頁面有幫助嗎?