Асинхронные функции в интеграции с LangChain Milvus

Open In Colab GitHub Repository

В этом уроке рассматривается использование асинхронных функций в langchain-milvus для создания высокопроизводительных приложений. Используя асинхронные методы, вы можете значительно повысить пропускную способность и скорость реакции вашего приложения, особенно при работе с крупномасштабным поиском. Создаете ли вы систему рекомендаций в реальном времени, внедряете семантический поиск в приложение или создаете конвейер RAG (Retrieval-Augmented Generation), операции async помогут вам более эффективно обрабатывать одновременные запросы. Высокопроизводительная векторная база данных Milvus в сочетании с мощными LLM-абстракциями LangChain может стать надежной основой для создания масштабируемых приложений ИИ.

Обзор Async API

langchain-milvus обеспечивает всестороннюю поддержку асинхронных операций, значительно повышая производительность в масштабных параллельных сценариях. Асинхронный API поддерживает согласованный дизайн интерфейса с синхронным API.

Основные асинхронные функции

Чтобы использовать асинхронные операции в langchain-milvus, просто добавьте префикс a к именам методов. Это позволяет лучше использовать ресурсы и повышает пропускную способность при обработке одновременных запросов на получение информации.

Тип операцииМетод синхронизацииАсинхронный методОписание
Добавить текстыadd_texts()aadd_texts()Добавление текстов в векторное хранилище
Добавить документыadd_documents()aadd_documents()Добавление документов в векторное хранилище
Добавление вкрапленийadd_embeddings()aadd_embeddings()Добавить векторы вкраплений
Поиск по сходствуsimilarity_search()asimilarity_search()Семантический поиск по тексту
Векторный поискsimilarity_search_by_vector()asimilarity_search_by_vector()Семантический поиск по вектору
Поиск с оценкойsimilarity_search_with_score()asimilarity_search_with_score()Семантический поиск по тексту и возврат оценок сходства
Векторный поиск с оценкойsimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Семантический поиск по вектору и возврат оценок сходства
Поиск по разнообразиюmax_marginal_relevance_search()amax_marginal_relevance_search()MMR-поиск (возвращает похожие, но при этом оптимизирует разнообразие)
Векторный поиск по разнообразиюmax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()MMR-поиск по вектору
Операция удаленияdelete()adelete()Удаление документов
Операция Upsertupsert()aupsert()Обновить (обновить, если существует, в противном случае вставить) документы
Поиск по метаданнымsearch_by_metadata()asearch_by_metadata()Запрос с фильтрацией метаданных
Получить первичные ключиget_pks()aget_pks()Получение первичных ключей по выражению
Создать из текстовfrom_texts()afrom_texts()Создание векторного хранилища из текстов

Для получения более подробной информации об этих функциях обратитесь к справочнику API.

Преимущества производительности

Асинхронные операции обеспечивают значительное повышение производительности при обработке больших объемов одновременных запросов, что особенно удобно для:

  • Пакетной обработки документов
  • Сценарии поиска с высокой интенсивностью запросов
  • Производственные приложения RAG
  • Крупномасштабный импорт/экспорт данных

В этом руководстве мы продемонстрируем эти преимущества производительности на примере подробного сравнения синхронных и асинхронных операций и покажем, как использовать асинхронные API для достижения оптимальной производительности в ваших приложениях.

Прежде чем начать

Сниппеты кода на этой странице требуют следующих зависимостей:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Если вы используете Google Colab, для включения только что установленных зависимостей вам, возможно, потребуется перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).

Мы будем использовать модели OpenAI. Вам необходимо подготовить api ключ OPENAI_API_KEY в качестве переменной окружения:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Если вы используете Jupyter Notebook, вам нужно запустить эту строку кода перед запуском асинхронного кода:

import nest_asyncio

nest_asyncio.apply()

Изучение асинхронных API и сравнение производительности

Теперь давайте углубимся в сравнение производительности между синхронными и асинхронными операциями с помощью langchain-milvus.

Сначала импортируйте необходимые библиотеки:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Настройка тестовых функций

Создадим вспомогательные функции для генерации тестовых данных:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Инициализация хранилища векторов

Прежде чем запускать наши тесты производительности, нам нужно создать чистое векторное хранилище Milvus. Эта функция гарантирует, что мы начнем со свежей коллекции для каждого теста, устраняя любые помехи от предыдущих данных:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Async vs Sync: Добавление документов

Теперь давайте сравним производительность синхронного и асинхронного добавления документов. Эти функции помогут нам определить, насколько быстрее могут быть асинхронные операции при добавлении нескольких документов в векторное хранилище. Асинхронная версия создает задачи для каждого добавления документа и выполняет их параллельно, в то время как синхронная версия обрабатывает документы по одному:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Теперь давайте выполним наши тесты производительности с разным количеством документов, чтобы увидеть реальную разницу в производительности. Мы проведем тесты с различной нагрузкой, чтобы понять, как масштабируются асинхронные операции по сравнению с их синхронными аналогами. Тесты измерят время выполнения для обоих подходов и помогут продемонстрировать преимущества производительности асинхронных операций:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Для сравнения производительности поиска нам нужно сначала заполнить векторное хранилище. Следующие функции помогут нам измерить производительность поиска, создав несколько одновременных поисковых запросов и сравнив время их выполнения при синхронном и асинхронном подходах:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Теперь давайте проведем комплексные тесты производительности поиска, чтобы увидеть, как асинхронные операции масштабируются по сравнению с синхронными. Мы протестируем различные объемы запросов, чтобы продемонстрировать преимущества производительности асинхронных операций, особенно при увеличении количества одновременных операций:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Async vs Sync: Удаление

Операции удаления - еще один важный аспект, в котором асинхронные операции могут обеспечить значительное повышение производительности. Давайте создадим функции для измерения разницы в производительности между синхронными и асинхронными операциями удаления. Эти тесты помогут продемонстрировать, как асинхронные операции могут более эффективно обрабатывать пакетные удаления:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Теперь давайте выполним тесты производительности удаления, чтобы оценить разницу в производительности. Мы начнем со свежего векторного хранилища, заполненного тестовыми данными, а затем выполним операции удаления, используя синхронный и асинхронный подходы:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Заключение

Это руководство продемонстрировало значительные преимущества в производительности при использовании асинхронных операций с LangChain и Milvus. Мы сравнили синхронные и асинхронные версии операций добавления, поиска и удаления, показав, как асинхронные операции могут обеспечить существенный прирост скорости, особенно для больших пакетных операций.

Основные выводы:

  1. Асинхронные операции приносят наибольшую пользу при выполнении множества отдельных операций, которые могут выполняться параллельно.
  2. При выполнении больших объемов работы разрыв в производительности между синхронными и асинхронными операциями увеличивается
  3. Асинхронные операции полностью используют вычислительную мощность машин

При создании производственных RAG-приложений с помощью LangChain и Milvus следует использовать API async, если производительность вызывает озабоченность, особенно при выполнении параллельных операций.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?