الدوال غير المتزامنة في تكامل لانغتشين ميلفوس

Open In Colab GitHub Repository

يستكشف هذا البرنامج التعليمي كيفية الاستفادة من الدوال غير المتزامنة في langchain-milvus لبناء تطبيقات عالية الأداء. باستخدام الدوال غير المتزامنة، يمكنك تحسين إنتاجية تطبيقك واستجابته بشكل كبير، خاصةً عند التعامل مع الاسترجاع على نطاق واسع. سواء كنت تقوم ببناء نظام توصية في الوقت الحقيقي، أو تنفيذ بحث دلالي في تطبيقك، أو إنشاء خط أنابيب RAG (استرجاع-جيل معزز)، يمكن أن تساعدك العمليات غير المتزامنة في التعامل مع الطلبات المتزامنة بكفاءة أكبر. يمكن أن توفر قاعدة البيانات المتجهة عالية الأداء Milvus جنبًا إلى جنب مع تجريدات LLM القوية من LangChain أساسًا قويًا لبناء تطبيقات ذكاء اصطناعي قابلة للتطوير.

نظرة عامة على واجهة برمجة التطبيقات غير المتزامنة

توفر langchain-milvus دعمًا شاملاً للعمليات غير المتزامنة، مما يحسن الأداء بشكل كبير في السيناريوهات المتزامنة واسعة النطاق. تحافظ واجهة برمجة التطبيقات غير المتزامنة على تصميم واجهة متناسقة مع واجهة برمجة التطبيقات المتزامنة.

الوظائف الأساسية غير المتزامنة

لاستخدام العمليات غير المتزامنة في langchain-milvus، ما عليك سوى إضافة بادئة a إلى أسماء الطرق. يسمح ذلك باستخدام أفضل للموارد وتحسين الإنتاجية عند التعامل مع طلبات الاسترجاع المتزامنة.

نوع العمليةطريقة المزامنةطريقة غير متزامنةالوصف
إضافة نصوصadd_texts()aadd_texts()إضافة نصوص إلى مخزن المتجهات
إضافة مستنداتadd_documents()aadd_documents()إضافة مستندات إلى مخزن المتجهات
إضافة تضميناتadd_embeddings()aadd_embeddings()إضافة متجهات التضمين
بحث التشابهsimilarity_search()asimilarity_search()البحث الدلالي بالنص
البحث عن المتجهاتsimilarity_search_by_vector()asimilarity_search_by_vector()البحث الدلالي بالمتجه
البحث بالنتيجةsimilarity_search_with_score()asimilarity_search_with_score()البحث الدلالي عن طريق النص وإرجاع درجات التشابه
البحث عن طريق المتجهات بالنتيجةsimilarity_search_with_score_by_vector()asimilarity_search_with_score_by_vector()البحث الدلالي حسب المتجه وإرجاع درجات التشابه
البحث بالتنوعmax_marginal_relevance_search()amax_marginal_relevance_search()بحث MMR (إرجاع المتشابهات مع تحسين التنوع أيضًا)
بحث التنوع المتجهيmax_marginal_relevance_search_by_vector()amax_marginal_relevance_search_by_vector()بحث MMR حسب المتجه
عملية الحذفdelete()adelete()حذف المستندات
عملية الإدراجupsert()aupsert()Upsert (تحديث إذا كانت موجودة، وإلا إدراج) المستندات
البحث في البيانات الوصفيةsearch_by_metadata()asearch_by_metadata()الاستعلام مع تصفية البيانات الوصفية
الحصول على المفاتيح الأساسيةget_pks()aget_pks()الحصول على المفاتيح الأساسية حسب التعبير
إنشاء من النصوصfrom_texts()afrom_texts()إنشاء مخزن متجه من النصوص

لمزيد من المعلومات التفصيلية حول هذه الدوال، يرجى الرجوع إلى مرجع واجهة برمجة التطبيقات.

مزايا الأداء

توفر العمليات غير المتزامنة تحسينات كبيرة في الأداء عند التعامل مع كميات كبيرة من الطلبات المتزامنة، وهي مناسبة بشكل خاص لـ

  • معالجة المستندات المجمعة
  • سيناريوهات البحث عالية التزامن
  • تطبيقات RAG للإنتاج
  • استيراد/تصدير البيانات على نطاق واسع

في هذا البرنامج التعليمي، سنوضح في هذا البرنامج التعليمي فوائد الأداء هذه من خلال مقارنات مفصلة للعمليات المتزامنة وغير المتزامنة، لنوضح لك كيفية الاستفادة من واجهات برمجة التطبيقات غير المتزامنة لتحقيق الأداء الأمثل في تطبيقاتك.

قبل أن تبدأ

تتطلب مقتطفات التعليمات البرمجية في هذه الصفحة التبعيات التالية:

! pip install -U pymilvus langchain-milvus langchain langchain-core langchain-openai langchain-text-splitters nest-asyncio

إذا كنت تستخدم Google Colab، لتمكين التبعيات المثبتة للتو، قد تحتاج إلى إعادة تشغيل وقت التشغيل (انقر على قائمة "وقت التشغيل" في أعلى الشاشة، وحدد "إعادة تشغيل الجلسة" من القائمة المنسدلة).

سنستخدم نماذج OpenAI. يجب عليك إعداد مفتاح api OPENAI_API_KEY كمتغير بيئة:

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

إذا كنت تستخدم دفتر Jupyter Notebook، فأنت بحاجة إلى تشغيل هذا السطر من التعليمات البرمجية قبل تشغيل التعليمات البرمجية غير المتزامنة:

import nest_asyncio

nest_asyncio.apply()

استكشاف واجهات برمجة التطبيقات غير المتزامنة ومقارنة الأداء

دعنا الآن نتعمق أكثر في مقارنة الأداء بين العمليات المتزامنة وغير المتزامنة باستخدام langchain-milvus.

أولًا، استورد المكتبات اللازمة:

import asyncio
import random
import time
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_milvus import Milvus

# Define the Milvus URI
URI = "http://localhost:19530"

إعداد دوال الاختبار

لننشئ دوال مساعدة لتوليد بيانات الاختبار:

def random_id():
    """Generate a random string ID"""
    random_num_str = ""
    for _ in range(16):
        random_digit = str(random.randint(0, 9))
        random_num_str += random_digit
    return random_num_str


def generate_test_documents(num_docs):
    """Generate test documents for performance testing"""
    docs = []
    for i in range(num_docs):
        content = (
            f"This is test document {i} with some random content: {random.random()}"
        )
        metadata = {
            "id": f"doc_{i}",
            "score": random.random(),
            "category": f"cat_{i % 5}",
        }
        doc = Document(page_content=content, metadata=metadata)
        docs.append(doc)
    return docs

تهيئة مخزن المتجهات

قبل أن نتمكن من إجراء اختبارات الأداء الخاصة بنا، نحتاج إلى إعداد مخزن متجه Milvus نظيف. تضمن هذه الدالة أن نبدأ بمجموعة جديدة لكل اختبار، مما يزيل أي تداخل من البيانات السابقة:

def init_vector_store():
    """Initialize and return a fresh vector store for testing"""
    return Milvus(
        embedding_function=OpenAIEmbeddings(),
        collection_name="langchain_perf_test",
        connection_args={"uri": URI},
        auto_id=True,
        drop_old=True,  # Always start with a fresh collection
    )

المزامنة مقابل المزامنة: إضافة مستندات

لنقارن الآن أداء إضافة المستندات المتزامنة مقابل إضافة المستندات غير المتزامنة. ستساعدنا هذه الوظائف في قياس مدى سرعة العمليات غير المتزامنة عند إضافة مستندات متعددة إلى مخزن المتجهات. يقوم الإصدار غير المتزامن بإنشاء مهام لكل عملية إضافة مستند وتشغيلها بشكل متزامن، بينما يقوم الإصدار المتزامن بمعالجة المستندات واحدًا تلو الآخر:

async def async_add(milvus_store, num_adding):
    """Add documents asynchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    tasks = []
    for doc in docs:
        # Create tasks for each document addition
        task = milvus_store.aadd_documents([doc])
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_add(milvus_store, num_adding):
    """Add documents synchronously and measure the time"""
    docs = generate_test_documents(num_adding)
    start_time = time.time()
    for doc in docs:
        result = milvus_store.add_documents([doc])
    end_time = time.time()
    return end_time - start_time

الآن دعنا ننفذ اختبارات الأداء الخاصة بنا مع أعداد مختلفة من المستندات لنرى الاختلافات في الأداء في العالم الحقيقي. سنختبر بأحمال متفاوتة لفهم كيفية قياس العمليات غير المتزامنة مقارنةً بنظيراتها المتزامنة. ستقيس الاختبارات وقت التنفيذ لكلتا الطريقتين وتساعد في توضيح فوائد أداء العمليات غير المتزامنة:

add_counts = [10, 100]

# Get the event loop
loop = asyncio.get_event_loop()

# Create a new vector store for testing
milvus_store = init_vector_store()

# Test async document addition
for count in add_counts:

    async def measure_async_add():
        async_time = await async_add(milvus_store, count)
        print(f"Async add for {count} documents took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_add())

# Reset vector store for sync tests
milvus_store = init_vector_store()

# Test sync document addition
for count in add_counts:
    sync_time = sync_add(milvus_store, count)
    print(f"Sync add for {count} documents took {sync_time:.2f} seconds")
2025-06-05 10:44:12,274 [DEBUG][_create_connection]: Created new connection using: dd5f77bb78964c079da42c2446b03bf6 (async_milvus_client.py:599)


Async add for 10 documents took 1.74 seconds


2025-06-05 10:44:16,940 [DEBUG][_create_connection]: Created new connection using: 8b13404a78654cdd9b790371eb44e427 (async_milvus_client.py:599)


Async add for 100 documents took 2.77 seconds
Sync add for 10 documents took 5.36 seconds
Sync add for 100 documents took 65.60 seconds

لمقارنة أداء البحث، سنحتاج إلى ملء مخزن المتجهات أولاً. ستساعدنا الدوال التالية في قياس أداء البحث من خلال إنشاء استعلامات بحث متزامنة متعددة ومقارنة وقت التنفيذ بين النهج المتزامن وغير المتزامن:

def populate_vector_store(milvus_store, num_docs=1000):
    """Populate the vector store with test documents"""
    docs = generate_test_documents(num_docs)
    milvus_store.add_documents(docs)
    return docs


async def async_search(milvus_store, num_queries):
    """Perform async searches and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_queries):
        query = f"test document {i % 50}"
        task = milvus_store.asimilarity_search(query=query, k=3)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_search(milvus_store, num_queries):
    """Perform sync searches and measure the time"""
    start_time = time.time()
    for i in range(num_queries):
        query = f"test document {i % 50}"
        result = milvus_store.similarity_search(query=query, k=3)
    end_time = time.time()
    return end_time - start_time

الآن دعنا نجري اختبارات أداء بحث شاملة لنرى كيف يمكن قياس أداء العمليات غير المتزامنة مقارنةً بالعمليات المتزامنة. سنختبر باستخدام أحجام استعلامات مختلفة لتوضيح فوائد أداء العمليات غير المتزامنة، خاصةً مع زيادة عدد العمليات المتزامنة:

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

query_counts = [10, 100]

# Test async search
for count in query_counts:

    async def measure_async_search():
        async_time = await async_search(milvus_store, count)
        print(f"Async search for {count} queries took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_search())

# Test sync search
for count in query_counts:
    sync_time = sync_search(milvus_store, count)
    print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
2025-06-05 10:45:28,131 [DEBUG][_create_connection]: Created new connection using: 851824591c64415baac843e676e78cdd (async_milvus_client.py:599)


Async search for 10 queries took 2.31 seconds
Async search for 100 queries took 3.72 seconds
Sync search for 10 queries took 6.07 seconds
Sync search for 100 queries took 54.22 seconds

غير المتزامنة مقابل المتزامنة: الحذف

عمليات الحذف هي جانب مهم آخر حيث يمكن أن توفر العمليات غير المتزامنة تحسينات كبيرة في الأداء. لنقم بإنشاء دوال لقياس فرق الأداء بين عمليات الحذف المتزامنة وغير المتزامنة. ستساعد هذه الاختبارات في توضيح كيف يمكن لعمليات الحذف غير المتزامنة التعامل مع عمليات الحذف المجمعة بكفاءة أكبر:

async def async_delete(milvus_store, num_deleting):
    """Delete documents asynchronously and measure the time"""
    start_time = time.time()
    tasks = []
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        task = milvus_store.adelete(expr=expr)
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    return end_time - start_time


def sync_delete(milvus_store, num_deleting):
    """Delete documents synchronously and measure the time"""
    start_time = time.time()
    for i in range(num_deleting):
        expr = f"id == 'doc_{i}'"
        result = milvus_store.delete(expr=expr)
    end_time = time.time()
    return end_time - start_time

الآن دعنا ننفذ اختبارات أداء الحذف لقياس فرق الأداء. سنبدأ بمخزن متجه جديد مملوء ببيانات الاختبار، ثم ننفذ عمليات الحذف باستخدام كل من النهجين المتزامن وغير المتزامن:

delete_counts = [10, 100]

# Initialize and populate the vector store
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test async delete
for count in delete_counts:

    async def measure_async_delete():
        async_time = await async_delete(milvus_store, count)
        print(f"Async delete for {count} operations took {async_time:.2f} seconds")
        return async_time

    loop.run_until_complete(measure_async_delete())

# Reset and repopulate the vector store for sync tests
milvus_store = init_vector_store()
populate_vector_store(milvus_store, 1000)

# Test sync delete
for count in delete_counts:
    sync_time = sync_delete(milvus_store, count)
    print(f"Sync delete for {count} operations took {sync_time:.2f} seconds")
2025-06-05 10:46:57,211 [DEBUG][_create_connection]: Created new connection using: 504e9ce3be92411e87077971c82baca2 (async_milvus_client.py:599)


Async delete for 10 operations took 0.58 seconds


2025-06-05 10:47:12,309 [DEBUG][_create_connection]: Created new connection using: 22c1513b444e4c40936e2176d7a1a154 (async_milvus_client.py:599)


Async delete for 100 operations took 0.61 seconds
Sync delete for 10 operations took 2.82 seconds
Sync delete for 100 operations took 29.21 seconds

الخاتمة

أظهر هذا البرنامج التعليمي مزايا الأداء الكبيرة لاستخدام العمليات غير المتزامنة مع LangChain و Milvus. لقد قارنا بين الإصدارات المتزامنة وغير المتزامنة لعمليات الإضافة والبحث والحذف، موضحين كيف يمكن للعمليات غير المتزامنة أن توفر تحسينات كبيرة في السرعة، خاصةً للعمليات المجمعة الكبيرة.

الوجبات الرئيسية:

  1. توفر العمليات غير المتزامنة أكبر فائدة عند تنفيذ العديد من العمليات الفردية التي يمكن تشغيلها بالتوازي
  2. بالنسبة لأعباء العمل التي تولد إنتاجية أعلى، تتسع فجوة الأداء بين العمليات المتزامنة وغير المتزامنة
  3. تستفيد العمليات غير المتزامنة استفادة كاملة من قوة الحوسبة للأجهزة

عند إنشاء تطبيقات RAG للإنتاج باستخدام LangChain وMilvus، ضع في اعتبارك استخدام واجهة برمجة التطبيقات غير المتزامنة عندما يكون الأداء مصدر قلق، خاصةً بالنسبة للعمليات المتزامنة.

جرب Managed Milvus مجاناً

Zilliz Cloud خالي من المتاعب، ويعمل بواسطة Milvus ويعمل بسرعة 10 أضعاف.

ابدأ
التعليقات

هل كانت هذه الصفحة مفيدة؟