Milvus
Zilliz
  • Home
  • Blog
  • أصبح RAG متعدد الوسائط بسيطًا: RAG-Anything + Milvus بدلاً من 20 أداة منفصلة

أصبح RAG متعدد الوسائط بسيطًا: RAG-Anything + Milvus بدلاً من 20 أداة منفصلة

  • Tutorials
November 25, 2025
Min Yin

كان بناء نظام RAG متعدد الوسائط يعني تجميع عشرات الأدوات المتخصصة معًا - واحدة للتعرف الضوئي على الحروف وواحدة للجداول وواحدة للصيغ الرياضية وواحدة للتضمينات وواحدة للبحث وما إلى ذلك. صُممت خطوط أنابيب RAG التقليدية للنصوص، وبمجرد أن بدأت المستندات في تضمين الصور والجداول والمعادلات والمخططات والمحتويات المنظمة الأخرى، سرعان ما أصبحت سلسلة الأدوات فوضوية وغير قابلة للإدارة.

أماRAG-Anything، الذي طورته جامعة هونغ كونغ HKU، فقد غيّر ذلك. وهي مبنية على LightRAG، وتوفر منصة الكل في واحد التي يمكنها تحليل أنواع المحتوى المتنوعة بالتوازي وتعيينها في رسم بياني معرفي موحد. لكن توحيد خط الأنابيب هو نصف القصة فقط. لاسترداد الأدلة عبر هذه الطرائق المتنوعة، لا تزال بحاجة إلى بحث متجه سريع وقابل للتطوير يمكنه التعامل مع العديد من أنواع التضمين في وقت واحد. وهنا يأتي دور ميلفوس. كقاعدة بيانات متجهات مفتوحة المصدر وعالية الأداء، يلغي Milvus الحاجة إلى حلول تخزين وبحث متعددة. فهو يدعم البحث على نطاق واسع في الشبكة النانوية الوطنية واسترجاع الكلمات المتجهة الهجينة والكلمات المتجهة الهجينة وتصفية البيانات الوصفية وإدارة التضمين المرنة - كل ذلك في مكان واحد.

في هذا المنشور، سنقوم بتفصيل كيفية عمل RAG-Anything وMilvus معًا لاستبدال سلسلة أدوات متعددة الوسائط مجزأة بمجموعة أدوات نظيفة وموحدة - وسنوضح كيف يمكنك بناء نظام RAG Q&A عملي متعدد الوسائط بخطوات قليلة.

ما هو RAG-Anything وكيف يعمل؟

RAG-Anything هو إطار عمل RAG مصمم لكسر حاجز النص فقط للأنظمة التقليدية. فبدلاً من الاعتماد على أدوات متخصصة متعددة، يوفر بيئة واحدة وموحدة يمكنها تحليل المعلومات ومعالجتها واسترجاعها عبر أنواع المحتوى المختلطة.

يدعم إطار العمل المستندات التي تحتوي على نصوص ومخططات وجداول وتعبيرات رياضية، مما يتيح للمستخدمين الاستعلام عبر جميع الطرائق من خلال واجهة واحدة متماسكة. وهذا يجعلها مفيدة بشكل خاص في مجالات مثل البحث الأكاديمي وإعداد التقارير المالية وإدارة المعرفة المؤسسية، حيث تكون المواد متعددة الوسائط شائعة.

يعتمد RAG-Anything في جوهره على خط أنابيب متعدد الوسائط متعدد المراحل: تحليل المستند ← تحليل المحتوى ← الرسم البياني المعرفي ← الاسترجاع الذكي. تتيح هذه البنية التنسيق الذكي والفهم متعدد الوسائط، مما يسمح للنظام بالتعامل بسلاسة مع طرائق المحتوى المتنوعة ضمن سير عمل واحد متكامل.

بنية "1 + 3 + N"

على المستوى الهندسي، تتحقق إمكانيات RAG-Anything من خلال بنية "1 + 3 + N":

المحرك الأساسي

يوجد في مركز RAG-Anything محرك رسم بياني معرفي مستوحى من LightRAG. هذه الوحدة الأساسية هي المسؤولة عن استخراج الكيانات متعددة الوسائط، وتخطيط العلاقات عبر الوسائط، والتخزين الدلالي المتجه. وعلى عكس أنظمة RAG التقليدية التي تعتمد على النصوص فقط، يفهم المحرك الكيانات من النصوص، والكائنات المرئية داخل الصور، والهياكل العلائقية المضمنة في الجداول.

3 معالجات مشروطية

يدمج نظام RAG-Anything ثلاثة معالجات طرائق متخصصة مصممة لفهم عميق خاص بالطرائق. وهي تشكل معًا طبقة التحليل متعدد الوسائط في النظام.

  • يفسرImageModalProcessor المحتوى المرئي ومعناه السياقي.

  • يقومTableModalProcessor بتحليل هياكل الجداول وفك تشفير العلاقات المنطقية والعددية داخل البيانات.

  • يقومEquationModalProcessor بفهم الدلالات الكامنة وراء الرموز والصيغ الرياضية.

المحللون

لدعم البنية المتنوعة للمستندات في العالم الحقيقي، يوفر RAG-Anything طبقة تحليل قابلة للتوسيع مبنية على محركات استخراج متعددة. وهي تدمج حاليًا كلاً من MinerU وDocling، وتختار تلقائيًا المُحلل التحليلي الأمثل بناءً على نوع المستند وتعقيده الهيكلي.

استنادًا إلى بنية "1 + 3 + N"، تعمل RAG-Anything على تحسين خط أنابيب RAG التقليدي من خلال تغيير كيفية التعامل مع أنواع المحتوى المختلفة. فبدلًا من معالجة النصوص والصور والجداول واحدًا تلو الآخر، يقوم النظام بمعالجتها جميعًا مرة واحدة.

# The core configuration demonstrates the parallel processing design
config = RAGAnythingConfig(
    working_dir="./rag_storage",
    parser="mineru",
    parse_method="auto",  # Automatically selects the optimal parsing strategy
    enable_image_processing=True,
    enable_table_processing=True, 
    enable_equation_processing=True,
    max_workers=8  # Supports multi-threaded parallel processing
)

يعمل هذا التصميم على تسريع معالجة المستندات التقنية الكبيرة بشكل كبير. تُظهر الاختبارات المعيارية أنه عندما يستخدم النظام المزيد من أنوية وحدة المعالجة المركزية، يصبح أسرع بشكل ملحوظ، مما يقلل بشكل كبير من الوقت اللازم لمعالجة كل مستند.

تحسين التخزين والاسترجاع متعدد الطبقات

علاوةً على تصميمه متعدد الوسائط، يستخدم RAG-Anything أيضًا نهج التخزين والاسترجاع متعدد الطبقات لجعل النتائج أكثر دقة وكفاءة.

  • يتم تخزينالنص في قاعدة بيانات متجهة تقليدية.

  • تتم إدارةالصور في مخزن ميزات مرئية منفصل.

  • يتم الاحتفاظبالجداول في مخزن بيانات منظم.

  • يتم تحويلالصيغ الرياضية إلى متجهات دلالية.

من خلال تخزين كل نوع من أنواع المحتوى في تنسيق مناسب خاص به، يمكن للنظام اختيار أفضل طريقة استرجاع لكل طريقة بدلاً من الاعتماد على بحث تشابه عام واحد. وهذا يؤدي إلى نتائج أسرع وأكثر موثوقية عبر أنواع مختلفة من المحتوى.

كيف يتناسب ميلفوس مع RAG-Anything

يوفر RAG-Anything استرجاعًا قويًا متعدد الوسائط، ولكن القيام بذلك بشكل جيد يتطلب بحثًا متجهًا سريعًا وقابلًا للتطوير عبر جميع أنواع التضمينات. يملأ Milvus هذا الدور بشكل مثالي.

بفضل بنيته السحابية الأصلية والفصل بين الحوسبة والتخزين، يوفر Milvus قابلية عالية للتوسع وكفاءة من حيث التكلفة. وهو يدعم الفصل بين القراءة والكتابة وتوحيد الدُفعات المتدفقة، مما يسمح للنظام بالتعامل مع أعباء العمل عالية التضمين مع الحفاظ على أداء الاستعلام في الوقت الفعلي - تصبح البيانات الجديدة قابلة للبحث فور إدراجها.

كما يضمن Milvus أيضًا موثوقية على مستوى المؤسسات من خلال تصميمه الموزع والمتحمل للأخطاء، والذي يحافظ على استقرار النظام حتى في حالة فشل العقد الفردية. وهذا يجعله مناسبًا بقوة لعمليات نشر RAG متعددة الوسائط على مستوى الإنتاج.

كيفية بناء نظام أسئلة وأجوبة متعدد الوسائط باستخدام RAG-Anything و Milvus

يوضح هذا العرض التوضيحي كيفية بناء نظام متعدد الوسائط للأسئلة والأجوبة باستخدام إطار عمل RAG-Anything، وقاعدة بيانات Milvus المتجهة، ونموذج تضمين TongYi. (يركز هذا المثال على كود التنفيذ الأساسي وليس إعداد إنتاج كامل).

عرض توضيحي عملي

المتطلبات الأساسية :

  • بايثون: 3.10 أو أعلى

  • قاعدة بيانات المتجهات: خدمة ميلفوس (ميلفوس لايت)

  • الخدمة السحابية: مفتاح Alibaba Cloud API (لخدمة LLM وخدمات التضمين)

  • نموذج LLM: qwen-vl-max (نموذج ممكّن للرؤية)

نموذج التضمين: tongyi-embedding-vision-plus

- python -m venv .venv && source .venv/bin/activate  # For Windows users:  .venvScriptsactivate
- pip install -r requirements-min.txt
- cp .env.example .env #add DASHSCOPE_API_KEY

تنفيذ الحد الأدنى من مثال العمل:

python minimal_[main.py](<http://main.py>)

المخرجات المتوقعة:

بمجرد تشغيل البرنامج النصي بنجاح، يجب أن تعرض المحطة الطرفية:

  • نتيجة الأسئلة والأجوبة النصية التي تم إنشاؤها بواسطة LLM.

  • وصف الصورة المسترجعة المطابقة للاستعلام.

هيكل المشروع

.
├─ requirements-min.txt
├─ .env.example
├─ [config.py](<http://config.py>)
├─ milvus_[store.py](<http://store.py>)
├─ [adapters.py](<http://adapters.py>)
├─ minimal_[main.py](<http://main.py>)
└─ sample
   ├─ docs
   │  └─ faq_milvus.txt
   └─ images
      └─ milvus_arch.png

تبعيات المشروع

raganything
lightrag
pymilvus[lite]>=2.3.0
aiohttp>=3.8.0
orjson>=3.8.0
python-dotenv>=1.0.0
Pillow>=9.0.0
numpy>=1.21.0,<2.0.0
rich>=12.0.0

متغيرات البيئة

# Alibaba Cloud DashScope
DASHSCOPE_API_KEY=your_api_key_here
# If the endpoint changes in future releases, please update it accordingly.
ALIYUN_LLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_VLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_EMBED_URL=https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding
# Model names (configure all models here for consistency)
LLM_TEXT_MODEL=qwen-max
LLM_VLM_MODEL=qwen-vl-max
EMBED_MODEL=tongyi-embedding-vision-plus
# Milvus Lite
MILVUS_URI=milvus_lite.db
MILVUS_COLLECTION=rag_multimodal_collection
EMBED_DIM=1152

التكوين

import os
from dotenv import load_dotenv
load_dotenv()
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
LLM_TEXT_MODEL = os.getenv("LLM_TEXT_MODEL", "qwen-max")
LLM_VLM_MODEL = os.getenv("LLM_VLM_MODEL", "qwen-vl-max")
EMBED_MODEL = os.getenv("EMBED_MODEL", "tongyi-embedding-vision-plus")
ALIYUN_LLM_URL = os.getenv("ALIYUN_LLM_URL")
ALIYUN_VLM_URL = os.getenv("ALIYUN_VLM_URL")
ALIYUN_EMBED_URL = os.getenv("ALIYUN_EMBED_URL")
MILVUS_URI = os.getenv("MILVUS_URI", "milvus_lite.db")
MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION", "rag_multimodal_collection")
EMBED_DIM = int(os.getenv("EMBED_DIM", "1152"))
# Basic runtime parameters
TIMEOUT = 60
MAX_RETRIES = 2

استدعاء النموذج

import os
import base64
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from config import (
    DASHSCOPE_API_KEY, LLM_TEXT_MODEL, LLM_VLM_MODEL, EMBED_MODEL,
    ALIYUN_LLM_URL, ALIYUN_VLM_URL, ALIYUN_EMBED_URL, EMBED_DIM, TIMEOUT
)
HEADERS = {
    "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
    "Content-Type": "application/json",
}
class AliyunLLMAdapter:
    def __init__(self):
        self.text_url = ALIYUN_LLM_URL
        self.vlm_url = ALIYUN_VLM_URL
        self.text_model = LLM_TEXT_MODEL
        self.vlm_model = LLM_VLM_MODEL
    async def chat(self, prompt: str) -> str:
        payload = {
            "model": self.text_model,
            "input": {"messages": [{"role": "user", "content": prompt}]},
            "parameters": {"max_tokens": 1024, "temperature": 0.5},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.text_url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["choices"][0]["message"]["content"]
    async def chat_vlm_with_image(self, prompt: str, image_path: str) -> str:
        with open(image_path, "rb") as f:
            image_b64 = base64.b64encode([f.read](<http://f.read>)()).decode("utf-8")
        payload = {
            "model": self.vlm_model,
            "input": {"messages": [{"role": "user", "content": [
                {"text": prompt},
                {"image": f"data:image/png;base64,{image_b64}"}
            ]}]},
            "parameters": {"max_tokens": 1024, "temperature": 0.2},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.vlm_url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["choices"][0]["message"]["content"]
class AliyunEmbeddingAdapter:
    def __init__(self):
        self.url = ALIYUN_EMBED_URL
        self.model = EMBED_MODEL
        self.dim = EMBED_DIM
    async def embed_text(self, text: str) -> List[float]:
        payload = {
            "model": self.model,
            "input": {"texts": [text]},
            "parameters": {"text_type": "query", "dimensions": self.dim},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["embeddings"][0]["embedding"]

تكامل ميلفوس لايت

import json
import time
from typing import List, Dict, Any, Optional
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
from config import MILVUS_URI, MILVUS_COLLECTION, EMBED_DIM
class MilvusVectorStore:
    def __init__(self, uri: str = MILVUS_URI, collection_name: str = MILVUS_COLLECTION, dim: int = EMBED_DIM):
        self.uri = uri
        self.collection_name = collection_name
        self.dim = dim
        self.collection: Optional[Collection] = None
        self._connect_and_prepare()
    def _connect_and_prepare(self):
        connections.connect("default", uri=self.uri)
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=512, is_primary=True),
                FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.dim),
                FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
                FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=32),
                FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024),
                FieldSchema(name="ts", dtype=[DataType.INT](<http://DataType.INT>)64),
            ]
            schema = CollectionSchema(fields, "Minimal multimodal collection")
            self.collection = Collection(self.collection_name, schema)
            self.collection.create_index("vector", {
                "metric_type": "COSINE",
                "index_type": "IVF_FLAT",
                "params": {"nlist": 1024}
            })
        self.collection.load()
    def upsert(self, ids: List[str], vectors: List[List[float]], contents: List[str],
               content_types: List[str], sources: List[str]) -> None:
        data = [
            ids,
            vectors,
            contents,
            content_types,
            sources,
            [int(time.time() * 1000)] * len(ids)
        ]
        self.collection.upsert(data)
        self.collection.flush()
    def search(self, query_vectors: List[List[float]], top_k: int = 5, content_type: Optional[str] = None):
        expr = f'content_type == "{content_type}"' if content_type else None
        params = {"metric_type": "COSINE", "params": {"nprobe": 16}}
        results = [self.collection.search](<http://self.collection.search>)(
            data=query_vectors,
            anns_field="vector",
            param=params,
            limit=top_k,
            expr=expr,
            output_fields=["id", "content", "content_type", "source", "ts"]
        )
        out = []
        for hits in results:
            out.append([{
                "id": h.entity.get("id"),
                "content": h.entity.get("content"),
                "content_type": h.entity.get("content_type"),
                "source": h.entity.get("source"),
                "score": h.score
            } for h in hits])
        return out

نقطة الدخول الرئيسية

"""
Minimal Working Example:
- Insert a short text FAQ into LightRAG (text retrieval context)
- Insert an image description vector into Milvus (image retrieval context)
- Execute two example queries: one text QA and one image-based QA
"""
import asyncio
import uuid
from pathlib import Path
from rich import print
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
from adapters import AliyunLLMAdapter, AliyunEmbeddingAdapter
from milvus_store import MilvusVectorStore
from config import EMBED_DIM
SAMPLE_DOC = Path("sample/docs/faq_milvus.txt")
SAMPLE_IMG = Path("sample/images/milvus_arch.png")
async def main():
    # 1) Initialize core components
    llm = AliyunLLMAdapter()
    emb = AliyunEmbeddingAdapter()
    store = MilvusVectorStore()
    # 2) Initialize LightRAG (for text-only retrieval)
    async def llm_complete(prompt: str, max_tokens: int = 1024) -> str:
        return await [llm.chat](<http://llm.chat>)(prompt)
    async def embed_func(text: str) -> list:
        return await emb.embed_text(text)
    rag = LightRAG(
        working_dir="rag_workdir_min",
        llm_model_func=llm_complete,
        embedding_func=EmbeddingFunc(
            embedding_dim=EMBED_DIM,
            max_token_size=8192,
            func=embed_func
        ),
    )
    # 3) Insert text data
    if SAMPLE_DOC.exists():
        text = SAMPLE_[DOC.read](<http://DOC.read>)_text(encoding="utf-8")
        await rag.ainsert(text)
        print("[green]Inserted FAQ text into LightRAG[/green]")
    else:
        print("[yellow] sample/docs/faq_milvus.txt not found[/yellow]")
    # 4) Insert image data (store description in Milvus)
    if SAMPLE_IMG.exists():
        # Use the VLM to generate a description as its semantic content
        desc = await [llm.chat](<http://llm.chat>)_vlm_with_image("Please briefly describe the key components of the Milvus architecture shown in the image.", str(SAMPLE_IMG))
        vec = await emb.embed_text(desc)  # Use text embeddings to maintain a consistent vector dimension, simplifying reuse
        store.upsert(
            ids=[str(uuid.uuid4())],
            vectors=[vec],
            contents=[desc],
            content_types=["image"],
            sources=[str(SAMPLE_IMG)]
        )
        print("[green]Inserted image description into Milvus(content_type=image)[/green]")
    else:
        print("[yellow] sample/images/milvus_arch.png not found[/yellow]")
    # 5) Query: Text-based QA (from LightRAG)
    q1 = "Does Milvus support simultaneous insertion and search? Give a short answer."
    ans1 = await rag.aquery(q1, param=QueryParam(mode="hybrid"))
    print("\\n[bold]Text QA[/bold]")
    print(ans1)
    # 6) Query: Image-related QA (from Milvus)
    q2 = "What are the key components of the Milvus architecture?"
    q2_vec = await emb.embed_text(q2)
    img_hits = [store.search](<http://store.search>)([q2_vec], top_k=3, content_type="image")
    print("\\n[bold]Image Retrieval (returns semantic image descriptions)[/bold]")
    print(img_hits[0] if img_hits else [])
if __name__ == "__main__":
    [asyncio.run](<http://asyncio.run>)(main())

يمكنك الآن اختبار نظام RAG متعدد الوسائط باستخدام مجموعة البيانات الخاصة بك.

مستقبل نظام RAG متعدد الوسائط

مع انتقال المزيد من بيانات العالم الواقعي إلى ما هو أبعد من النص العادي، بدأت أنظمة الاسترجاع المعزز (RAG) في التطور نحو تعدد الوسائط الحقيقية. وتوضح حلول مثل RAG-Anything بالفعل كيف يمكن معالجة النصوص والصور والجداول والصيغ والمحتويات المنظمة الأخرى بطريقة موحدة. وبالنظر إلى المستقبل، أعتقد أن هناك ثلاثة اتجاهات رئيسية ستشكل المرحلة التالية من RAG متعدد الوسائط:

التوسع إلى المزيد من الطرائق

الأطر الحالية - مثل RAG-Anything - يمكنها بالفعل التعامل مع النصوص والصور والجداول والتعبيرات الرياضية. تتمثل الحدود التالية في دعم أنواع محتوى أكثر ثراءً، بما في ذلك الفيديو والصوت وبيانات الاستشعار والنماذج ثلاثية الأبعاد، مما يمكّن أنظمة RAG من فهم واسترجاع المعلومات من مجموعة كاملة من البيانات الحديثة.

تحديثات البيانات في الوقت الحقيقي

تعتمد معظم خطوط أنابيب RAG اليوم على مصادر بيانات ثابتة نسبيًا. وبما أن المعلومات تتغير بسرعة أكبر، ستتطلب الأنظمة المستقبلية تحديثات المستندات في الوقت الحقيقي، واستيعاب التدفق، والفهرسة التدريجية. هذا التحول سيجعل RAG أكثر استجابة وفي الوقت المناسب وأكثر موثوقية في البيئات الديناميكية.

نقل RAG إلى أجهزة الحافة

مع أدوات المتجهات خفيفة الوزن مثل Milvus Lite، لم يعد RAG متعدد الوسائط محصورًا في السحابة. يتيح نشر RAG على الأجهزة الطرفية وأنظمة إنترنت الأشياء إمكانية الاسترجاع الذكي بالقرب من مكان توليد البيانات - مما يحسن من زمن الاستجابة والخصوصية والكفاءة الإجمالية.

👉 هل أنت مستعد لاستكشاف RAG متعدد الوسائط؟

جرّب إقران خط أنابيب متعدد الوسائط مع Milvus وجرّب الاسترجاع السريع والقابل للتطوير عبر النصوص والصور وغيرها.

هل لديك أسئلة أو تريد التعمق في أي ميزة؟ انضم إلى قناة Discord الخاصة بنا أو قم بتسجيل المشكلات على GitHub. يمكنك أيضًا حجز جلسة فردية مدتها 20 دقيقة للحصول على رؤى وإرشادات وإجابات لأسئلتك من خلال ساعات عمل Milvus المكتبية.

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    استمر في القراءة