Fungsi Asinkron dalam Integrasi LangChain Milvus

Open In Colab GitHub Repository

Tutorial ini membahas cara memanfaatkan fungsi asinkron di langchain-milvus untuk membangun aplikasi berkinerja tinggi. Dengan menggunakan metode asinkron, Anda dapat secara signifikan meningkatkan throughput dan daya tanggap aplikasi Anda, terutama ketika berhadapan dengan pengambilan data berskala besar. Baik Anda sedang membangun sistem rekomendasi waktu nyata, mengimplementasikan pencarian semantik dalam aplikasi Anda, atau membuat pipeline RAG (Retrieval-Augmented Generation), operasi asinkronisasi dapat membantu Anda menangani permintaan yang datang secara bersamaan dengan lebih efisien. Basis data vektor berkinerja tinggi Milvus yang dikombinasikan dengan abstraksi LLM yang kuat dari LangChain dapat memberikan fondasi yang kuat untuk membangun aplikasi AI yang dapat diskalakan.

Ikhtisar API Async

langchain-milvus menyediakan dukungan operasi asinkron yang komprehensif, yang secara signifikan meningkatkan kinerja dalam skenario konkuren berskala besar. API asinkronisasi mempertahankan desain antarmuka yang konsisten dengan API sinkronisasi.

Fungsi Asinkronisasi Inti

Untuk menggunakan operasi asinkronisasi di langchain-milvus, cukup tambahkan awalan a pada nama metode. Hal ini memungkinkan pemanfaatan sumber daya yang lebih baik dan peningkatan throughput saat menangani permintaan pengambilan secara bersamaan.

Jenis OperasiMetode SinkronisasiMetode AsinkronisasiDeskripsi
Menambahkan Teksadd_texts()aadd_texts()Menambahkan teks ke penyimpanan vektor
Tambahkan Dokumenadd_documents()aadd_documents()Menambahkan dokumen ke penyimpanan vektor
Tambahkan Penyematanadd_embeddings()aadd_embeddings()Menambahkan vektor penyematan
Pencarian Kemiripansimilarity_search()asimilarity_search()Pencarian semantik dengan teks
Pencarian Vektorsimilarity_search_by_vector()asimilarity_search_by_vector()Pencarian semantik dengan vektor
Pencarian dengan Skorsimilarity_search_with_score()asimilarity_search_with_score()Pencarian semantik dengan teks dan mengembalikan skor kemiripan
Pencarian Vektor dengan Skorsimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()Pencarian semantik dengan vektor dan mengembalikan skor kemiripan
Pencarian Keragamanmax_marginal_relevance_search()amax_marginal_relevance_search()Pencarian MMR (mengembalikan yang mirip sekaligus mengoptimalkan keragaman)
Pencarian Keragaman Vektormax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()Pencarian MMR dengan vektor
Operasi Penghapusandelete()adelete()Menghapus dokumen
Operasi Penambahanupsert()aupsert()Menambah (memperbarui jika sudah ada, jika tidak, menyisipkan) dokumen
Pencarian Metadatasearch_by_metadata()asearch_by_metadata()Kueri dengan pemfilteran metadata
Dapatkan Kunci Utamaget_pks()aget_pks()Dapatkan kunci utama dengan ekspresi
Membuat dari Teksfrom_texts()afrom_texts()Membuat penyimpanan vektor dari teks

Untuk informasi lebih rinci tentang fungsi-fungsi ini, silakan lihat Referensi API.

Manfaat Kinerja

Operasi asinkronisasi memberikan peningkatan kinerja yang signifikan ketika menangani volume besar permintaan bersamaan, terutama cocok untuk:

  • Pemrosesan dokumen batch
  • Skenario pencarian konkurensi tinggi
  • Aplikasi RAG produksi
  • Impor/ekspor data skala besar

Dalam tutorial ini, kami akan mendemonstrasikan manfaat kinerja ini melalui perbandingan terperinci antara operasi sinkron dan asinkron, yang menunjukkan kepada Anda cara memanfaatkan API asinkron untuk kinerja optimal dalam aplikasi Anda.

Sebelum Anda memulai

Cuplikan kode di halaman ini memerlukan dependensi berikut:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja terinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).

Kita akan menggunakan model OpenAI. Anda harus menyiapkan kunci api OPENAI_API_KEY sebagai variabel lingkungan:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Jika Anda menggunakan Jupyter Notebook, Anda harus menjalankan baris kode ini sebelum menjalankan kode asinkron:

import nest_asyncio

nest_asyncio.apply()

Menjelajahi API Asinkron dan Perbandingan Kinerja

Sekarang mari kita selami lebih dalam perbandingan performa antara operasi sinkron dan asinkron dengan langchain-milvus.

Pertama, impor library yang diperlukan:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

Menyiapkan Fungsi Uji Coba

Mari kita buat fungsi-fungsi pembantu untuk menghasilkan data uji:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

Menginisialisasi Penyimpanan Vektor

Sebelum kita dapat menjalankan uji performa, kita perlu menyiapkan penyimpanan vektor Milvus yang bersih. Fungsi ini memastikan kita memulai dengan koleksi yang baru untuk setiap pengujian, menghilangkan gangguan dari data sebelumnya:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

Asinkronisasi vs Sinkronisasi: Menambahkan Dokumen

Sekarang mari kita bandingkan kinerja penambahan dokumen sinkron vs asinkron. Fungsi-fungsi ini akan membantu kita mengukur seberapa cepat operasi asinkron ketika menambahkan beberapa dokumen ke penyimpanan vektor. Versi asinkron membuat tugas untuk setiap penambahan dokumen dan menjalankannya secara bersamaan, sedangkan versi sinkron memproses dokumen satu per satu:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

Sekarang mari kita jalankan pengujian performa dengan jumlah dokumen yang berbeda untuk melihat perbedaan performa di dunia nyata. Kami akan menguji dengan berbagai beban untuk memahami bagaimana skala operasi asinkron dibandingkan dengan operasi sinkron. Pengujian akan mengukur waktu eksekusi untuk kedua pendekatan tersebut dan membantu mendemonstrasikan manfaat kinerja dari operasi asinkron:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

Untuk perbandingan performa pencarian, kita perlu mengisi penyimpanan vektor terlebih dahulu. Fungsi berikut ini akan membantu kita mengukur performa pencarian dengan membuat beberapa kueri pencarian secara bersamaan dan membandingkan waktu eksekusi antara pendekatan sinkron dan asinkron:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

Sekarang mari kita jalankan pengujian kinerja pencarian yang komprehensif untuk melihat bagaimana skala operasi asinkron dibandingkan dengan operasi sinkron. Kami akan menguji dengan volume kueri yang berbeda untuk menunjukkan manfaat kinerja operasi asinkron, terutama seiring dengan meningkatnya jumlah operasi bersamaan:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

Asinkronisasi vs Sinkronisasi: Menghapus

Operasi hapus adalah aspek penting lainnya di mana operasi asinkronisasi dapat memberikan peningkatan kinerja yang signifikan. Mari kita buat fungsi untuk mengukur perbedaan kinerja antara operasi penghapusan sinkron dan asinkron. Tes ini akan membantu menunjukkan bagaimana operasi asinkronisasi dapat menangani penghapusan batch dengan lebih efisien:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

Sekarang mari kita jalankan tes performa penghapusan untuk mengukur perbedaan performa. Kita akan mulai dengan penyimpanan vektor baru yang diisi dengan data uji, lalu melakukan operasi penghapusan menggunakan pendekatan sinkron dan asinkron:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

Kesimpulan

Tutorial ini mendemonstrasikan keuntungan performa yang signifikan dari penggunaan operasi asinkron dengan LangChain dan Milvus. Kami membandingkan versi sinkron dan asinkron dari operasi tambah, cari, dan hapus, yang menunjukkan bagaimana operasi asinkron dapat memberikan peningkatan kecepatan yang substansial, terutama untuk operasi batch yang besar.

Kesimpulan utama:

  1. Operasi asinkron memberikan manfaat paling besar ketika melakukan banyak operasi individual yang dapat berjalan secara paralel
  2. Untuk beban kerja yang menghasilkan throughput yang lebih tinggi, kesenjangan kinerja antara operasi sinkronisasi dan asinkronisasi melebar
  3. Operasi asinkronisasi sepenuhnya memanfaatkan daya komputasi mesin

Ketika membangun aplikasi RAG produksi dengan LangChain dan Milvus, pertimbangkan untuk menggunakan API asinkronisasi ketika kinerja menjadi perhatian, terutama untuk operasi yang dilakukan secara bersamaan.

Coba Milvus yang Dikelola secara Gratis

Zilliz Cloud bebas masalah, didukung oleh Milvus dan 10x lebih cepat.

Mulai
Umpan balik

Apakah halaman ini bermanfaat?