RAG مع Milvus وLlamaIndex Async API

Open In Colab GitHub Repository

يوضّح هذا البرنامج التعليمي كيفية استخدام LlamaIndex مع Milvus لإنشاء خط أنابيب معالجة غير متزامن للمستندات ل RAG. يوفر LlamaIndex طريقة لمعالجة المستندات وتخزينها في قاعدة بيانات متجهة مثل Milvus. من خلال الاستفادة من واجهة برمجة التطبيقات غير المتزامنة ل LlamaIndex ومكتبة عميل Milvus Python، يمكننا زيادة إنتاجية خط الأنابيب لمعالجة وفهرسة كميات كبيرة من البيانات بكفاءة.

في هذا البرنامج التعليمي، سنقدم أولاً استخدام الأساليب غير المتزامنة لبناء RAG مع LlamaIndex و Milvus من مستوى عالٍ، ثم سنقدم استخدام الأساليب منخفضة المستوى ومقارنة الأداء بين المتزامن وغير المتزامن.

قبل أن تبدأ

تتطلب مقتطفات التعليمات البرمجية في هذه الصفحة تبعيات pymilvus وlamaindex. يمكنك تثبيتها باستخدام الأوامر التالية:

$ pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio

إذا كنت تستخدم Google Colab، لتمكين التبعيات المثبتة للتو، قد تحتاج إلى إعادة تشغيل وقت التشغيل (انقر على قائمة "وقت التشغيل" في أعلى الشاشة، وحدد "إعادة تشغيل الجلسة" من القائمة المنسدلة).

سنستخدم النماذج من OpenAI. يجب عليك إعداد مفتاح api OPENAI_API_KEY كمتغير بيئة.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

إذا كنت تستخدم دفتر Jupyter Notebook، فستحتاج إلى تشغيل هذا السطر من التعليمات البرمجية قبل تشغيل التعليمات البرمجية غير المتزامنة.

import nest_asyncio

nest_asyncio.apply()

إعداد البيانات

يمكنك تنزيل عينة من البيانات باستخدام الأوامر التالية:

$ mkdir -p 'data/'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
$ wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'

بناء RAG مع المعالجة غير المتزامنة

يوضح هذا القسم كيفية بناء نظام RAG الذي يمكنه معالجة المستندات بطريقة غير متزامنة.

استورد المكتبات اللازمة وحدد Milvus URI وأبعاد التضمين.

import asyncio
import random
import time

from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore

URI = "http://localhost:19530"
DIM = 768
  • إذا كان لديك حجم كبير من البيانات، يمكنك إعداد خادم Milvus فعال على docker أو kubernetes. في هذا الإعداد، يُرجى استخدام uri الخادم، على سبيل المثالhttp://localhost:19530 ، كـ uri.
  • إذا كنت ترغب في استخدام Zilliz Cloud، الخدمة السحابية المدارة بالكامل لـ Milvus، اضبط uri و token ، والتي تتوافق مع نقطة النهاية العامة ومفتاح Api في Zilliz Cloud.
  • في حالة الأنظمة المعقدة (مثل الاتصالات الشبكية)، يمكن أن تؤدي المعالجة غير المتزامنة إلى تحسين الأداء مقارنة بالمزامنة. لذلك نعتقد أن Milvus-Lite غير مناسب لاستخدام الواجهات غير المتزامنة لأن السيناريوهات المستخدمة غير مناسبة.

حدد دالة تهيئة يمكننا استخدامها مرة أخرى لإعادة بناء مجموعة Milvus.

def init_vector_store():
    return MilvusVectorStore(
        uri=URI,
        # token=TOKEN,
        dim=DIM,
        collection_name="test_collection",
        embedding_field="embedding",
        id_field="id",
        similarity_metric="COSINE",
        consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/consistency.md#Consistency-Level for more details.
        overwrite=True,  # To overwrite the collection if it already exists
    )


vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)

استخدم SimpleDirectoryReader SimpleDirectoryReader لتغليف كائن مستند LlamaIndex من الملف paul_graham_essay.txt.

from llama_index.core import SimpleDirectoryReader

# load documents
documents = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
).load_data()

print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb

قم بتثبيت نموذج تضمين الوجه المعانق محليًا. يؤدي استخدام نموذج محلي إلى تجنب خطر الوصول إلى حدود معدل واجهة برمجة التطبيقات أثناء إدراج البيانات غير المتزامن، حيث يمكن أن تتراكم طلبات واجهة برمجة التطبيقات المتزامنة بسرعة وتستهلك ميزانيتك في واجهة برمجة التطبيقات العامة. ومع ذلك، إذا كان لديك حد معدل مرتفع، يمكنك اختيار استخدام خدمة نموذج بعيد بدلاً من ذلك.

from llama_index.embeddings.huggingface import HuggingFaceEmbedding


embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

قم بإنشاء فهرس وإدراج المستند.

قمنا بتعيين use_async على True لتمكين وضع الإدراج غير المتزامن.

# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    use_async=True,
)

قم بتهيئة LLM.

from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-3.5-turbo")

عند إنشاء محرك الاستعلام، يمكنك أيضًا تعيين المعلمة use_async إلى True لتمكين البحث غير المتزامن.

query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.

استكشف واجهة برمجة التطبيقات غير المتزامنة

في هذا القسم، سنقدم في هذا القسم استخدام واجهة برمجة التطبيقات ذات المستوى الأدنى ومقارنة أداء عمليات التشغيل المتزامنة وغير المتزامنة.

إضافة غير متزامن

إعادة تهيئة مخزن المتجهات.

vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)

لنعرّف دالة إنتاج عقدة، والتي ستُستخدم لتوليد عدد كبير من عقد الاختبار للفهرس.

def random_id():
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def produce_nodes(num_adding):
    node_list = []
    for i in range(num_adding):
        node = TextNode(
            id_=random_id(),
            text=f"n{i}_text",
            embedding=[0.5] * (DIM - 1) + [random.random()],
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")},
        )
        node_list.append(node)
    return node_list

تحديد دالة aync لإضافة مستندات إلى مخزن المتجهات. نستخدم الدالة async_add() في مثيل مخزن متجه ميلفوس.

async def async_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    tasks = []
    for i in range(num_adding):
        sub_nodes = node_list[i]
        task = vector_store.async_add([sub_nodes])  # use async_add()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
add_counts = [10, 100, 1000]

احصل على حلقة الحدث.

loop = asyncio.get_event_loop()

إضافة مستندات بشكل غير متزامن إلى مخزن المتجهات.

for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(count)
        print(f"Async add for {count} took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds
Async add for 100 took 0.48 seconds
Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)

قارن مع الإضافة المتزامنة

حدد دالة إضافة متزامنة. ثم قم بقياس وقت التشغيل تحت نفس الشرط.

def sync_add(num_adding):
    node_list = produce_nodes(num_adding)
    start_time = time.time()
    for node in node_list:
        result = vector_store.add([node])
    end_time = time.time()
    return end_time - start_time
for count in add_counts:
    sync_time = sync_add(count)
    print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds
Sync add for 100 took 5.85 seconds
Sync add for 1000 took 62.91 seconds

تُظهر النتيجة أن عملية الإضافة المتزامنة أبطأ بكثير من عملية الإضافة غير المتزامنة.

أعد تهيئة مخزن المتجهات وأضف بعض المستندات قبل تشغيل البحث.

vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)

حدد دالة بحث غير متزامن. نستخدم الدالة aquery() في مثيل مخزن متجهات Milvus.

async def async_search(num_queries):
    start_time = time.time()
    tasks = []
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        task = vector_store.aquery(query=query)  # use aquery()
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time
query_counts = [10, 100, 1000]

البحث غير المتزامن من مخزن Milvus.

for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds
Async search for 100 queries took 1.39 seconds
Async search for 1000 queries took 8.81 seconds

حدد دالة بحث متزامن. ثم قم بقياس وقت التشغيل تحت نفس الشرط.

def sync_search(num_queries):
    start_time = time.time()
    for _ in range(num_queries):
        query = VectorStoreQuery(
            query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
        )
        result = vector_store.query(query=query)
    end_time = time.time()
    return end_time - start_time
for count in query_counts:
    sync_time = sync_search(count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds
Sync search for 100 queries took 30.80 seconds
Sync search for 1000 queries took 308.80 seconds

تظهر النتيجة أن عملية البحث المتزامن أبطأ بكثير من البحث غير المتزامن.

جرب Managed Milvus مجاناً

Zilliz Cloud خالي من المتاعب، ويعمل بواسطة Milvus ويعمل بسرعة 10 أضعاف.

ابدأ
التعليقات

هل كانت هذه الصفحة مفيدة؟