RAG dengan Milvus dan LlamaIndex Async API
Tutorial ini mendemonstrasikan cara menggunakan LlamaIndex dengan Milvus untuk membuat pipeline pemrosesan dokumen asinkron untuk RAG. LlamaIndex menyediakan cara untuk memproses dokumen dan menyimpannya dalam db vektor seperti Milvus. Dengan memanfaatkan API asinkron dari LlamaIndex dan pustaka klien Milvus Python, kita dapat meningkatkan throughput pipeline untuk memproses dan mengindeks data dalam jumlah besar secara efisien.
Dalam tutorial ini, pertama-tama kami akan memperkenalkan penggunaan metode asinkron untuk membangun RAG dengan LlamaIndex dan Milvus dari tingkat tinggi, dan kemudian memperkenalkan penggunaan metode tingkat rendah dan perbandingan kinerja sinkron dan asinkron.
Sebelum Anda mulai
Cuplikan kode di halaman ini membutuhkan dependensi pymilvus dan llamaindex. Anda dapat menginstalnya dengan menggunakan perintah berikut:
$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja terinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).
Kita akan menggunakan model dari OpenAI. Anda harus menyiapkan kunci api OPENAI_API_KEY sebagai variabel lingkungan.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Jika Anda menggunakan Jupyter Notebook, Anda harus menjalankan baris kode ini sebelum menjalankan kode asinkron.
import nest_asyncio
nest_asyncio.apply()
Menyiapkan data
Anda dapat mengunduh data sampel dengan perintah berikut:
$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
Membangun RAG dengan Pemrosesan Asinkron
Bagian ini menunjukkan cara membangun sistem RAG yang dapat memproses dokumen secara asinkron.
Impor pustaka yang diperlukan dan tentukan Milvus URI dan dimensi penyematan.
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- Jika Anda memiliki data dalam skala besar, Anda dapat menyiapkan server Milvus yang berkinerja baik pada docker atau kubernetes. Dalam pengaturan ini, silakan gunakan uri server, misalnya
http://localhost:19530, sebagaiuri. - Jika Anda ingin menggunakan Zilliz Cloud, layanan cloud yang dikelola sepenuhnya untuk Milvus, sesuaikan
uridantoken, yang sesuai dengan kunci Public Endpoint dan Api di Zilliz Cloud. - Dalam kasus sistem yang kompleks (seperti komunikasi jaringan), pemrosesan asinkron dapat membawa peningkatan kinerja dibandingkan dengan sinkronisasi. Jadi menurut kami Milvus-Lite tidak cocok untuk menggunakan antarmuka asinkron karena skenario yang digunakan tidak sesuai.
Tentukan fungsi inisialisasi yang dapat kita gunakan lagi untuk membangun kembali koleksi Milvus.
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Bounded", # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/tune_consistency.md#Consistency-Level for more details.
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
Gunakan SimpleDirectoryReader untuk membungkus objek dokumen LlamaIndex dari file paul_graham_essay.txt.
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
Instal model penyematan Hugging Face secara lokal. Menggunakan model lokal menghindari risiko mencapai batas tarif API selama penyisipan data asinkron, karena permintaan API secara bersamaan dapat dengan cepat bertambah dan menghabiskan anggaran Anda di API publik. Namun, jika Anda memiliki batas tarif yang tinggi, Anda dapat memilih untuk menggunakan layanan model jarak jauh.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
Buat indeks dan masukkan dokumen.
Kami mengatur use_async ke True untuk mengaktifkan mode penyisipan asinkronisasi.
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
Inisialisasi LLM.
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
Ketika membangun mesin kueri, Anda juga dapat mengatur parameter use_async ke True untuk mengaktifkan pencarian asinkron.
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
Menjelajahi API Asinkron
Pada bagian ini, kami akan memperkenalkan penggunaan API tingkat yang lebih rendah dan membandingkan kinerja sinkron dan asinkron.
Penambahan Asinkronisasi
Inisialisasi ulang penyimpanan vektor.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
Mari kita definisikan fungsi penghasil node, yang akan digunakan untuk menghasilkan sejumlah besar node uji untuk indeks.
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
)
node_list.append(node)
return node_list
Tentukan fungsi aync untuk menambahkan dokumen ke penyimpanan vektor. Kami menggunakan fungsi async_add() di dalam instance penyimpanan vektor Milvus.
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
Dapatkan perulangan peristiwa.
loop = asyncio.get_event_loop()
Menambahkan dokumen secara asinkron ke penyimpanan vektor.
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
Bandingkan dengan penambahan sinkron
Tentukan fungsi penambahan sinkron. Kemudian ukur waktu berjalan dalam kondisi yang sama.
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds
Hasilnya menunjukkan bahwa proses penambahan sinkron jauh lebih lambat daripada proses asinkron.
Pencarian asinkron
Inisialisasi ulang penyimpanan vektor dan tambahkan beberapa dokumen sebelum menjalankan pencarian.
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
Tentukan fungsi pencarian asinkron. Kami menggunakan fungsi aquery() dalam contoh penyimpanan vektor Milvus.
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
Pencarian asinkron dari penyimpanan Milvus.
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(f"Async search for {count} queries took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds
Bandingkan dengan pencarian sinkron
Tentukan fungsi pencarian sinkron. Kemudian ukur waktu yang berjalan dalam kondisi yang sama.
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds
Hasilnya menunjukkan bahwa proses pencarian sinkron jauh lebih lambat dibandingkan dengan pencarian asinkron.