Fungsi Asinkron dalam Integrasi LangChain Milvus
Tutorial ini membahas cara memanfaatkan fungsi asinkron di langchain-milvus untuk membangun aplikasi berkinerja tinggi. Dengan menggunakan metode asinkron, Anda dapat secara signifikan meningkatkan throughput dan daya tanggap aplikasi Anda, terutama ketika berhadapan dengan pengambilan data berskala besar. Baik Anda sedang membangun sistem rekomendasi waktu nyata, mengimplementasikan pencarian semantik dalam aplikasi Anda, atau membuat pipeline RAG (Retrieval-Augmented Generation), operasi asinkronisasi dapat membantu Anda menangani permintaan yang datang secara bersamaan dengan lebih efisien. Basis data vektor berkinerja tinggi Milvus yang dikombinasikan dengan abstraksi LLM yang kuat dari LangChain dapat memberikan fondasi yang kuat untuk membangun aplikasi AI yang dapat diskalakan.
Ikhtisar API Async
langchain-milvus menyediakan dukungan operasi asinkron yang komprehensif, yang secara signifikan meningkatkan kinerja dalam skenario konkuren berskala besar. API asinkronisasi mempertahankan desain antarmuka yang konsisten dengan API sinkronisasi.
Fungsi Asinkronisasi Inti
Untuk menggunakan operasi asinkronisasi di langchain-milvus, cukup tambahkan awalan a pada nama metode. Hal ini memungkinkan pemanfaatan sumber daya yang lebih baik dan peningkatan throughput saat menangani permintaan pengambilan secara bersamaan.
| Jenis Operasi | Metode Sinkronisasi | Metode Asinkronisasi | Deskripsi |
|---|---|---|---|
| Menambahkan Teks | add_texts() | aadd_texts() | Menambahkan teks ke penyimpanan vektor |
| Tambahkan Dokumen | add_documents() | aadd_documents() | Menambahkan dokumen ke penyimpanan vektor |
| Tambahkan Penyematan | add_embeddings() | aadd_embeddings() | Menambahkan vektor penyematan |
| Pencarian Kemiripan | similarity_search() | asimilarity_search() | Pencarian semantik dengan teks |
| Pencarian Vektor | similarity_search_by_vector() | asimilarity_search_by_vector() | Pencarian semantik dengan vektor |
| Pencarian dengan Skor | similarity_search_with_score() | asimilarity_search_with_score() | Pencarian semantik dengan teks dan mengembalikan skor kemiripan |
| Pencarian Vektor dengan Skor | similarity_search_with_score_by_vector() | asimilarity_search_with_score_by_vector() | Pencarian semantik dengan vektor dan mengembalikan skor kemiripan |
| Pencarian Keragaman | max_marginal_relevance_search() | amax_marginal_relevance_search() | Pencarian MMR (mengembalikan yang mirip sekaligus mengoptimalkan keragaman) |
| Pencarian Keragaman Vektor | max_marginal_relevance_search_by_vector() | amax_marginal_relevance_search_by_vector() | Pencarian MMR dengan vektor |
| Operasi Penghapusan | delete() | adelete() | Menghapus dokumen |
| Operasi Penambahan | upsert() | aupsert() | Menambah (memperbarui jika sudah ada, jika tidak, menyisipkan) dokumen |
| Pencarian Metadata | search_by_metadata() | asearch_by_metadata() | Kueri dengan pemfilteran metadata |
| Dapatkan Kunci Utama | get_pks() | aget_pks() | Dapatkan kunci utama dengan ekspresi |
| Membuat dari Teks | from_texts() | afrom_texts() | Membuat penyimpanan vektor dari teks |
Untuk informasi lebih rinci tentang fungsi-fungsi ini, silakan lihat Referensi API.
Manfaat Kinerja
Operasi asinkronisasi memberikan peningkatan kinerja yang signifikan ketika menangani volume besar permintaan bersamaan, terutama cocok untuk:
- Pemrosesan dokumen batch
- Skenario pencarian konkurensi tinggi
- Aplikasi RAG produksi
- Impor/ekspor data skala besar
Dalam tutorial ini, kami akan mendemonstrasikan manfaat kinerja ini melalui perbandingan terperinci antara operasi sinkron dan asinkron, yang menunjukkan kepada Anda cara memanfaatkan API asinkron untuk kinerja optimal dalam aplikasi Anda.
Sebelum Anda memulai
Cuplikan kode di halaman ini memerlukan dependensi berikut:
! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio
Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja terinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).
Kita akan menggunakan model OpenAI. Anda harus menyiapkan kunci api OPENAI_API_KEY sebagai variabel lingkungan:
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Jika Anda menggunakan Jupyter Notebook, Anda harus menjalankan baris kode ini sebelum menjalankan kode asinkron:
import nest_asyncio
nest_asyncio.apply()
Menjelajahi API Asinkron dan Perbandingan Kinerja
Sekarang mari kita selami lebih dalam perbandingan performa antara operasi sinkron dan asinkron dengan langchain-milvus.
Pertama, impor library yang diperlukan:
import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus
# Define the Milvus URI
URI = "http://localhost:19530"
Menyiapkan Fungsi Uji Coba
Mari kita buat fungsi-fungsi pembantu untuk menghasilkan data uji:
def random_id():
"""Generate a random string ID"""
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def generate_test_documents(num_docs):
"""Generate test documents for performance testing"""
docs = []
for i in range(num_docs):
content = (
f"This is test document {i} with some random content: {random.random()}"
)
metadata = {
"id": f"doc_{i}",
"score": random.random(),
"category": f"cat_{i % 5}",
}
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
Menginisialisasi Penyimpanan Vektor
Sebelum kita dapat menjalankan uji performa, kita perlu menyiapkan penyimpanan vektor Milvus yang bersih. Fungsi ini memastikan kita memulai dengan koleksi yang baru untuk setiap pengujian, menghilangkan gangguan dari data sebelumnya:
def init_vector_store():
"""Initialize and return a fresh vector store for testing"""
return Milvus(
embedding_function=OpenAIEmbeddings(),
collection_name="langchain_perf_test",
connection_args={"uri": URI},
auto_id=True,
drop_old=True, # Always start with a fresh collection
)
Asinkronisasi vs Sinkronisasi: Menambahkan Dokumen
Sekarang mari kita bandingkan kinerja penambahan dokumen sinkron vs asinkron. Fungsi-fungsi ini akan membantu kita mengukur seberapa cepat operasi asinkron ketika menambahkan beberapa dokumen ke penyimpanan vektor. Versi asinkron membuat tugas untuk setiap penambahan dokumen dan menjalankannya secara bersamaan, sedangkan versi sinkron memproses dokumen satu per satu:
async def async_add(milvus_store, num_adding):
"""Add documents asynchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
tasks = []
for doc in docs:
# Create tasks for each document addition
task = milvus_store.aadd_documents([doc])
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_add(milvus_store, num_adding):
"""Add documents synchronously and measure the time"""
docs = generate_test_documents(num_adding)
start_time = time.time()
for doc in docs:
result = milvus_store.add_documents([doc])
end_time = time.time()
return end_time - start_time
Sekarang mari kita jalankan pengujian performa dengan jumlah dokumen yang berbeda untuk melihat perbedaan performa di dunia nyata. Kami akan menguji dengan berbagai beban untuk memahami bagaimana skala operasi asinkron dibandingkan dengan operasi sinkron. Pengujian akan mengukur waktu eksekusi untuk kedua pendekatan tersebut dan membantu mendemonstrasikan manfaat kinerja dari operasi asinkron:
add_counts = [10, 100]
# Get the event loop
loop = asyncio.get_event_loop()
# Create a new vector store for testing
milvus_store = init_vector_store()
# Test async document addition
for count in add_counts:
async def measure_async_add():
async_time = await async_add(milvus_store, count)
print(f"Async add for {count} documents took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
# Reset vector store for sync tests
milvus_store = init_vector_store()
# Test sync document addition
for count in add_counts:
sync_time = sync_add(milvus_store, count)
print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)
Async add for 10 documents took 1.74 seconds
2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)
Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds
Asinkronisasi vs Sinkronisasi: Pencarian
Untuk perbandingan performa pencarian, kita perlu mengisi penyimpanan vektor terlebih dahulu. Fungsi berikut ini akan membantu kita mengukur performa pencarian dengan membuat beberapa kueri pencarian secara bersamaan dan membandingkan waktu eksekusi antara pendekatan sinkron dan asinkron:
def populate_vector_store(milvus_store, num_docs=1000):
"""Populate the vector store with test documents"""
docs = generate_test_documents(num_docs)
milvus_store.add_documents(docs)
return docs
async def async_search(milvus_store, num_queries):
"""Perform async searches and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_queries):
query = f"test document {i % 50}"
task = milvus_store.asimilarity_search(query=query, k=3)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_search(milvus_store, num_queries):
"""Perform sync searches and measure the time"""
start_time = time.time()
for i in range(num_queries):
query = f"test document {i % 50}"
result = milvus_store.similarity_search(query=query, k=3)
end_time = time.time()
return end_time - start_time
Sekarang mari kita jalankan pengujian kinerja pencarian yang komprehensif untuk melihat bagaimana skala operasi asinkron dibandingkan dengan operasi sinkron. Kami akan menguji dengan volume kueri yang berbeda untuk menunjukkan manfaat kinerja operasi asinkron, terutama seiring dengan meningkatnya jumlah operasi bersamaan:
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
query_counts = [10, 100]
# Test async search
for count in query_counts:
async def measure_async_search():
async_time = await async_search(milvus_store, count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
# Test sync search
for count in query_counts:
sync_time = sync_search(milvus_store, count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)
Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds
Asinkronisasi vs Sinkronisasi: Menghapus
Operasi hapus adalah aspek penting lainnya di mana operasi asinkronisasi dapat memberikan peningkatan kinerja yang signifikan. Mari kita buat fungsi untuk mengukur perbedaan kinerja antara operasi penghapusan sinkron dan asinkron. Tes ini akan membantu menunjukkan bagaimana operasi asinkronisasi dapat menangani penghapusan batch dengan lebih efisien:
async def async_delete(milvus_store, num_deleting):
"""Delete documents asynchronously and measure the time"""
start_time = time.time()
tasks = []
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
task = milvus_store.adelete(expr=expr)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
def sync_delete(milvus_store, num_deleting):
"""Delete documents synchronously and measure the time"""
start_time = time.time()
for i in range(num_deleting):
expr = f"id == 'doc_{i}'"
result = milvus_store.delete(expr=expr)
end_time = time.time()
return end_time - start_time
Sekarang mari kita jalankan tes performa penghapusan untuk mengukur perbedaan performa. Kita akan mulai dengan penyimpanan vektor baru yang diisi dengan data uji, lalu melakukan operasi penghapusan menggunakan pendekatan sinkron dan asinkron:
delete_counts = [10, 100]
# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test async delete
for count in delete_counts:
async def measure_async_delete():
async_time = await async_delete(milvus_store, count)
print(f"Async delete for {count} operations took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_delete())
# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)
# Test sync delete
for count in delete_counts:
sync_time = sync_delete(milvus_store, count)
print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)
Async delete for 10 operations took 0.58 seconds
2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)
Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds
Kesimpulan
Tutorial ini mendemonstrasikan keuntungan performa yang signifikan dari penggunaan operasi asinkron dengan LangChain dan Milvus. Kami membandingkan versi sinkron dan asinkron dari operasi tambah, cari, dan hapus, yang menunjukkan bagaimana operasi asinkron dapat memberikan peningkatan kecepatan yang substansial, terutama untuk operasi batch yang besar.
Kesimpulan utama:
- Operasi asinkron memberikan manfaat paling besar ketika melakukan banyak operasi individual yang dapat berjalan secara paralel
- Untuk beban kerja yang menghasilkan throughput yang lebih tinggi, kesenjangan kinerja antara operasi sinkronisasi dan asinkronisasi melebar
- Operasi asinkronisasi sepenuhnya memanfaatkan daya komputasi mesin
Ketika membangun aplikasi RAG produksi dengan LangChain dan Milvus, pertimbangkan untuk menggunakan API asinkronisasi ketika kinerja menjadi perhatian, terutama untuk operasi yang dilakukan secara bersamaan.