بناء RAG مع Milvus و Unstructured
توفرUnstructured منصة وأدوات لاستيعاب ومعالجة المستندات غير المهيكلة من أجل توليد الاسترجاع المعزز (RAG) وضبط النماذج. يوفر كلاً من منصة واجهة مستخدم بدون تعليمات برمجية وخدمات واجهة برمجة التطبيقات بدون خادم، مما يسمح للمستخدمين بمعالجة البيانات على موارد الحوسبة المستضافة من Unstructured.
في هذا البرنامج التعليمي، سوف نستخدم Unstructured لاستيعاب مستندات PDF ثم نستخدم Milvus لإنشاء خط أنابيب RAG.
التحضير
التبعيات والبيئة
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
إذا كنت تستخدم Google Colab، لتمكين التبعيات المثبتة للتو، فقد تحتاج إلى إعادة تشغيل وقت التشغيل (انقر على قائمة "وقت التشغيل" في أعلى الشاشة، وحدد "إعادة تشغيل الجلسة" من القائمة المنسدلة).
يمكنك الحصول على متغيرات البيئة UNSTRUCTURED_API_KEY
و UNSTRUCTURED_URL
من هنا.
سنستخدم OpenAI كـ LLM في هذا المثال. يجب عليك إعداد مفتاح api OPENAI_API_KEY
كمتغير بيئة.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
إعداد عملاء Milvus و OpenAI
يمكنك استخدام عميل Milvus لإنشاء مجموعة Milvus وإدراج البيانات فيها.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
أما بالنسبة للوسيطة MilvusClient
:
- يعد تعيين
uri
كملف محلي، على سبيل المثال./milvus.db
، هو الطريقة الأكثر ملاءمة، حيث أنه يستخدم تلقائيًا ميلفوس لايت لتخزين جميع البيانات في هذا الملف. - إذا كان لديك حجم كبير من البيانات، على سبيل المثال أكثر من مليون ناقل، يمكنك إعداد خادم Milvus أكثر أداءً على Docker أو Kubernetes. في هذا الإعداد، يُرجى استخدام عنوان الخادم والمنفذ كـ uri، على سبيل المثال
http://localhost:19530
. إذا قمت بتمكين ميزة المصادقة على Milvus، فاستخدم "<your_username>: <your_password>" كرمز مميز، وإلا فلا تقم بتعيين الرمز المميز. - إذا كنت ترغب في استخدام Zilliz Cloud، الخدمة السحابية المدارة بالكامل لـ Milvus، اضبط
uri
وtoken
، والتي تتوافق مع نقطة النهاية العامة ومفتاح Api في Zilliz Cloud.
تحقق مما إذا كانت المجموعة موجودة بالفعل وأسقطها إذا كانت موجودة.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
قم بإعداد عميل OpenAI لإنشاء تضمينات وإنشاء استجابات.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
إنشاء تضمين اختباري وطباعة بُعده والعناصر القليلة الأولى.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
إنشاء مجموعة ميلفوس
سننشئ مجموعة بالمخطط التالي:
id
: المفتاح الأساسي، وهو معرّف فريد لكل مستند.vector
: تضمين المستند.text
: المحتوى النصي للمستند.metadata
: البيانات الوصفية للمستند.
ثم نقوم ببناء فهرس AUTOINDEX
على الحقل vector
. ثم ننشئ المجموعة.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
تحميل البيانات من Unstructured
يوفر Unstructured خط أنابيب استيعاب مرن وقوي لمعالجة أنواع مختلفة من الملفات، بما في ذلك PDF و HTML وغيرها. سنستخدم وظيفة الاستيعاب لتقسيم ملفات PDF في دليل محلي. ومن ثم تحميل البيانات في Milvus.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
إدراج البيانات في Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
استرجاع وتوليد الاستجابة
تحديد دالة لاسترداد المستندات ذات الصلة من ملفوس.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
تحديد دالة لإنشاء استجابة باستخدام المستندات المسترجعة في خط أنابيب RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
لنختبر خط أنابيب RAG باستخدام نموذج سؤال.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.