랭체인 밀버스 통합의 비동기 함수

Open In Colab GitHub Repository

이 튜토리얼에서는 랭체인 밀 버스에서 비동기 함수를 활용하여 고성능 애플리케이션을 구축하는 방법을 살펴봅니다. 비동기 방식을 사용하면 특히 대규모 검색을 처리할 때 애플리케이션의 처리량과 응답성을 크게 향상시킬 수 있습니다. 실시간 추천 시스템을 구축하든, 애플리케이션에서 시맨틱 검색을 구현하든, RAG(검색 증강 생성) 파이프라인을 만들든, 비동기 작업을 사용하면 동시 요청을 보다 효율적으로 처리하는 데 도움이 될 수 있습니다. 고성능 벡터 데이터베이스 Milvus와 LangChain의 강력한 LLM 추상화를 결합하면 확장 가능한 AI 애플리케이션을 구축하기 위한 강력한 기반을 제공할 수 있습니다.

비동기 API 개요

랭체인-밀버스는 포괄적인 비동기 작업 지원을 제공하여 대규모 동시 시나리오에서 성능을 크게 향상시킵니다. 비동기 API는 동기 API와 일관된 인터페이스 디자인을 유지합니다.

핵심 비동기 기능

langchain-milvus에서 비동기 연산을 사용하려면 메서드 이름에 a 접두사를 추가하기만 하면 됩니다. 이렇게 하면 동시 검색 요청을 처리할 때 리소스 활용도를 높이고 처리량을 개선할 수 있습니다.

작업 유형동기화 메서드비동기 메서드설명
텍스트 추가add_texts()aadd_texts()벡터 스토어에 텍스트 추가
문서 추가add_documents()aadd_documents()벡터 스토어에 문서 추가
임베딩 추가add_embeddings()aadd_embeddings()임베딩 벡터 추가하기
유사성 검색similarity_search()asimilarity_search()텍스트로 시맨틱 검색
벡터 검색similarity_search_by_vector()asimilarity_search_by_vector()벡터로 시맨틱 검색
점수로 검색similarity_search_with_score()asimilarity_search_with_score()텍스트로 시맨틱 검색 및 유사도 점수 반환
점수로 벡터 검색similarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()벡터로 시맨틱 검색하고 유사도 점수를 반환합니다.
다양성 검색max_marginal_relevance_search()amax_marginal_relevance_search()MMR 검색(다양성을 최적화하면서 유사한 것을 반환)
벡터 다양성 검색max_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()벡터를 통한 MMR 검색
삭제 작업delete()adelete()문서 삭제
업서트 작업upsert()aupsert()문서 업서트(기존 문서가 있으면 업데이트, 없으면 삽입)
메타데이터 검색search_by_metadata()asearch_by_metadata()메타데이터 필터링으로 쿼리
기본 키 가져오기get_pks()aget_pks()표현식으로 기본 키 가져오기
텍스트에서 생성from_texts()afrom_texts()텍스트에서 벡터 저장소 만들기

이러한 함수에 대한 자세한 내용은 API 참조를 참조하세요.

성능 이점

비동기 작업은 대량의 동시 요청을 처리할 때 상당한 성능 향상을 제공하며, 특히 다음과 같은 경우에 적합합니다:

  • 일괄 문서 처리
  • 동시 요청이 많은 검색 시나리오
  • 프로덕션 RAG 애플리케이션
  • 대규모 데이터 가져오기/내보내기

이 튜토리얼에서는 동기식 작업과 비동기식 작업의 자세한 비교를 통해 이러한 성능 이점을 시연하고, 애플리케이션에서 최적의 성능을 위해 비동기 API를 활용하는 방법을 보여드립니다.

시작하기 전에

이 페이지의 코드 스니펫에는 다음과 같은 종속성이 필요합니다:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Google Colab을 사용하는 경우 방금 설치한 종속성을 사용하려면 런타임을 다시 시작해야 할 수 있습니다(화면 상단의 '런타임' 메뉴를 클릭하고 드롭다운 메뉴에서 '세션 다시 시작'을 선택).

OpenAI 모델을 사용합니다. 환경 변수로 OPENAI_API_KEY API 키를 준비해야 합니다:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

비동기 코드를 실행하기 전에 이 코드 줄을 실행해야 합니다:

import nest_asyncio

nest_asyncio.apply()

비동기 API와 성능 비교 살펴보기

이제 langchain-milvus를 사용해 동기식과 비동기식 작업의 성능 비교에 대해 자세히 알아보겠습니다.

먼저 필요한 라이브러리를 가져옵니다:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

테스트 함수 설정하기

테스트 데이터를 생성하는 헬퍼 함수를 만들어 보겠습니다:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

벡터 저장소 초기화

성능 테스트를 실행하기 전에 먼저 깨끗한 Milvus 벡터 저장소를 설정해야 합니다. 이 함수를 사용하면 각 테스트에 대해 새로운 컬렉션으로 시작하여 이전 데이터의 간섭을 제거할 수 있습니다:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

비동기 대 동기화: 문서 추가

이제 동기식 문서 추가와 비동기식 문서 추가의 성능을 비교해 보겠습니다. 이 함수는 벡터 스토어에 여러 문서를 추가할 때 비동기 작업이 얼마나 더 빠른지 측정하는 데 도움이 됩니다. 비동기 버전은 각 문서 추가에 대한 작업을 생성하고 동시에 실행하는 반면, 동기 버전은 문서를 하나씩 처리합니다:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

이제 다양한 문서 수로 성능 테스트를 실행하여 실제 성능 차이를 확인해 보겠습니다. 다양한 부하로 테스트하여 비동기 작업이 동기 작업과 비교하여 어떻게 확장되는지 이해해 보겠습니다. 이 테스트는 두 가지 접근 방식에 대한 실행 시간을 측정하고 비동기 작업의 성능 이점을 입증하는 데 도움이 됩니다:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

검색 성능 비교를 위해 먼저 벡터 저장소를 채워야 합니다. 다음 함수는 여러 개의 동시 검색 쿼리를 생성하고 동기 방식과 비동기 방식 간의 실행 시간을 비교하여 검색 성능을 측정하는 데 도움이 됩니다:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

이제 포괄적인 검색 성능 테스트를 실행하여 비동기 작업이 동기 작업과 비교하여 어떻게 확장되는지 살펴보겠습니다. 특히 동시 작업의 수가 증가할 때 비동기 작업의 성능 이점을 입증하기 위해 다양한 쿼리 볼륨으로 테스트해 보겠습니다:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

비동기 대 동기: 삭제

삭제 작업은 비동기 작업이 상당한 성능 향상을 제공할 수 있는 또 다른 중요한 측면입니다. 동기식 삭제 작업과 비동기식 삭제 작업의 성능 차이를 측정하는 함수를 만들어 보겠습니다. 이 테스트는 비동기 작업이 일괄 삭제를 더 효율적으로 처리하는 방법을 보여주는 데 도움이 될 것입니다:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

이제 삭제 성능 테스트를 실행하여 성능 차이를 정량화해 보겠습니다. 테스트 데이터로 채워진 새 벡터 저장소로 시작한 다음 동기식 및 비동기식 접근 방식을 모두 사용하여 삭제 작업을 수행해 보겠습니다:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

결론

이 튜토리얼에서는 LangChain과 Milvus에서 비동기 연산을 사용할 때 상당한 성능 이점이 있다는 것을 보여주었습니다. 추가, 검색, 삭제 작업의 동기식 버전과 비동기식 버전을 비교하여 비동기식 작업이 특히 대규모 배치 작업에서 어떻게 상당한 속도 향상을 가져올 수 있는지 보여주었습니다.

주요 요점

  1. 비동기 작업은 병렬로 실행할 수 있는 많은 개별 작업을 수행할 때 가장 큰 이점을 제공합니다.
  2. 처리량이 많은 워크로드의 경우 동기화 작업과 비동기 작업 간의 성능 격차가 더 커집니다.
  3. 비동기 작업은 머신의 컴퓨팅 성능을 최대한 활용합니다.

LangChain 및 Milvus로 프로덕션 RAG 애플리케이션을 구축할 때, 특히 동시 작업의 경우 성능이 우려되는 경우 비동기 API 사용을 고려하세요.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
피드백

이 페이지가 도움이 되었나요?