RAG с Milvus и LlamaIndex Async API

Open In Colab GitHub Repository

В этом руководстве показано, как использовать LlamaIndex вместе с Milvus для построения асинхронного конвейера обработки документов для RAG. LlamaIndex предоставляет возможность обрабатывать документы и хранить их в векторной базе данных, как Milvus. Используя асинхронный API LlamaIndex и клиентскую библиотеку Milvus Python, мы можем увеличить пропускную способность конвейера для эффективной обработки и индексации больших объемов данных.

В этом руководстве мы сначала познакомимся с использованием асинхронных методов для построения RAG с LlamaIndex и Milvus на высоком уровне, а затем рассмотрим использование низкоуровневых методов и сравнение производительности синхронных и асинхронных.

Прежде чем начать

Для фрагментов кода на этой странице требуются зависимости pymilvus и llamaindex. Вы можете установить их с помощью следующих команд:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

Если вы используете Google Colab, то для включения только что установленных зависимостей вам может потребоваться перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).

Мы будем использовать модели из OpenAI. Вам необходимо подготовить api ключ OPENAI_API_KEY в качестве переменной окружения.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Если вы используете Jupyter Notebook, вам нужно запустить эту строку кода перед запуском асинхронного кода.

import nest_asyncio

nest_asyncio.apply()

Подготовьте данные

Вы можете загрузить примеры данных с помощью следующих команд:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

Сборка RAG с асинхронной обработкой

В этом разделе мы покажем, как построить систему RAG, которая может обрабатывать документы асинхронным способом.

Импортируйте необходимые библиотеки и определите Milvus URI и размер встраивания.

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • Если у вас большие объемы данных, вы можете установить производительный сервер Milvus на docker или kubernetes. В этом случае используйте ури сервера, напримерhttp://localhost:19530, в качестве uri.
  • Если вы хотите использовать Zilliz Cloud, полностью управляемый облачный сервис для Milvus, настройте uri и token, которые соответствуют публичной конечной точке и ключу Api в Zilliz Cloud.
  • В случае сложных систем (например, сетевых коммуникаций) асинхронная обработка может повысить производительность по сравнению с синхронизацией. Поэтому мы считаем, что Milvus-Lite не подходит для использования асинхронных интерфейсов, так как используемые сценарии не подходят.

Определите функцию инициализации, которую мы сможем использовать снова, чтобы перестроить коллекцию Milvus.

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

Используйте SimpleDirectoryReader, чтобы обернуть объект документа LlamaIndex из файла paul_graham_essay.txt.

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

Локально создайте модель встраивания Hugging Face. Использование локальной модели позволяет избежать риска достижения ограничений скорости API при асинхронной вставке данных, так как одновременные запросы API могут быстро увеличиться и израсходовать ваш бюджет на публичный API. Однако если у вас высокий лимит скорости, вы можете предпочесть использовать удаленную службу модели.

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

Создайте индекс и вставьте документ.

Мы установили use_async на True, чтобы включить режим асинхронной вставки.

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

Инициализируйте LLM.

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

При создании механизма запросов вы также можете установить параметр use_async в значение True, чтобы включить асинхронный поиск.

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

Знакомство с Async API

В этом разделе мы познакомимся с использованием API нижнего уровня и сравним производительность синхронного и асинхронного выполнения.

Асинхронное добавление

Переинициализируем векторное хранилище.

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

Определим функцию создания узлов, которая будет использоваться для генерации большого количества тестовых узлов для индекса.

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

Определим функцию aync для добавления документов в векторное хранилище. Мы используем функцию async_add() в экземпляре векторного хранилища Milvus.

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

Получите цикл событий.

loop = asyncio.get_event_loop()

Асинхронное добавление документов в векторное хранилище.

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

Сравнение с синхронным добавлением

Определите функцию синхронного добавления. Затем измеряем время работы при тех же условиях.

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

Результат показывает, что процесс синхронного добавления намного медленнее, чем асинхронного.

Переинициализируйте векторное хранилище и добавьте несколько документов перед запуском поиска.

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

Определите функцию асинхронного поиска. Мы используем функцию aquery() в экземпляре векторного хранилища Milvus.

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

Асинхронный поиск из хранилища Milvus.

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

Определите функцию синхронного поиска. Затем измерьте время работы при тех же условиях.

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

Результат показывает, что процесс синхронного поиска намного медленнее, чем асинхронного.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?