🚀 جرب Zilliz Cloud، الـ Milvus المدارة بالكامل، مجاناً — تجربة أداء أسرع بـ 10 أضعاف! جرب الآن>>

milvus-logo
LFAI
الصفحة الرئيسية
  • عمليات الدمج
  • Home
  • Docs
  • عمليات الدمج

  • مصادر البيانات

  • غير منظم

بناء RAG مع Milvus و Unstructured

Open In Colab GitHub Repository

توفرUnstructured منصة وأدوات لاستيعاب ومعالجة المستندات غير المهيكلة من أجل توليد الاسترجاع المعزز (RAG) وضبط النماذج. يوفر كلاً من منصة واجهة مستخدم بدون تعليمات برمجية وخدمات واجهة برمجة التطبيقات بدون خادم، مما يسمح للمستخدمين بمعالجة البيانات على موارد الحوسبة المستضافة من Unstructured.

في هذا البرنامج التعليمي، سوف نستخدم Unstructured لاستيعاب مستندات PDF ثم نستخدم Milvus لإنشاء خط أنابيب RAG.

التحضير

التبعيات والبيئة

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

إذا كنت تستخدم Google Colab، لتمكين التبعيات المثبتة للتو، فقد تحتاج إلى إعادة تشغيل وقت التشغيل (انقر على قائمة "وقت التشغيل" في أعلى الشاشة، وحدد "إعادة تشغيل الجلسة" من القائمة المنسدلة).

يمكنك الحصول على متغيرات البيئة UNSTRUCTURED_API_KEY و UNSTRUCTURED_URL من هنا.

سنستخدم OpenAI كـ LLM في هذا المثال. يجب عليك إعداد مفتاح api OPENAI_API_KEY كمتغير بيئة.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

إعداد عملاء Milvus و OpenAI

يمكنك استخدام عميل Milvus لإنشاء مجموعة Milvus وإدراج البيانات فيها.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

أما بالنسبة للوسيطة MilvusClient:

  • يعد تعيين uri كملف محلي، على سبيل المثال./milvus.db ، هو الطريقة الأكثر ملاءمة، حيث أنه يستخدم تلقائيًا ميلفوس لايت لتخزين جميع البيانات في هذا الملف.
  • إذا كان لديك حجم كبير من البيانات، على سبيل المثال أكثر من مليون ناقل، يمكنك إعداد خادم Milvus أكثر أداءً على Docker أو Kubernetes. في هذا الإعداد، يُرجى استخدام عنوان الخادم والمنفذ كـ uri، على سبيل المثالhttp://localhost:19530. إذا قمت بتمكين ميزة المصادقة على Milvus، فاستخدم "<your_username>: <your_password>" كرمز مميز، وإلا فلا تقم بتعيين الرمز المميز.
  • إذا كنت ترغب في استخدام Zilliz Cloud، الخدمة السحابية المدارة بالكامل لـ Milvus، اضبط uri و token ، والتي تتوافق مع نقطة النهاية العامة ومفتاح Api في Zilliz Cloud.

تحقق مما إذا كانت المجموعة موجودة بالفعل وأسقطها إذا كانت موجودة.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

قم بإعداد عميل OpenAI لإنشاء تضمينات وإنشاء استجابات.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

إنشاء تضمين اختباري وطباعة بُعده والعناصر القليلة الأولى.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

إنشاء مجموعة ميلفوس

سننشئ مجموعة بالمخطط التالي:

  • id: المفتاح الأساسي، وهو معرّف فريد لكل مستند.
  • vector: تضمين المستند.
  • text: المحتوى النصي للمستند.
  • metadata: البيانات الوصفية للمستند.

ثم نقوم ببناء فهرس AUTOINDEX على الحقل vector. ثم ننشئ المجموعة.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

تحميل البيانات من Unstructured

يوفر Unstructured خط أنابيب استيعاب مرن وقوي لمعالجة أنواع مختلفة من الملفات، بما في ذلك PDF و HTML وغيرها. سنستخدم وظيفة الاستيعاب لتقسيم ملفات PDF في دليل محلي. ومن ثم تحميل البيانات في Milvus.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

إدراج البيانات في Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

استرجاع وتوليد الاستجابة

تحديد دالة لاسترداد المستندات ذات الصلة من ملفوس.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

تحديد دالة لإنشاء استجابة باستخدام المستندات المسترجعة في خط أنابيب RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

لنختبر خط أنابيب RAG باستخدام نموذج سؤال.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

جرب Managed Milvus مجاناً

Zilliz Cloud خالي من المتاعب، ويعمل بواسطة Milvus ويعمل بسرعة 10 أضعاف.

ابدأ
التعليقات

هل كانت هذه الصفحة مفيدة؟