Milvus
Zilliz
  • Home
  • Blog
  • マルチモーダルRAGをシンプルに:RAG-Anything+milvus、20の別々のツールに代わる

マルチモーダルRAGをシンプルに:RAG-Anything+milvus、20の別々のツールに代わる

  • Tutorials
November 25, 2025
Min Yin

マルチモーダルRAGシステムの構築は、OCR用、表用、数式用、埋め込み用、検索用など、12種類の専用ツールをつなぎ合わせることを意味していました。従来のRAGパイプラインはテキスト用に設計されており、文書に画像、表、数式、チャート、その他の構造化されたコンテンツが含まれるようになると、ツールチェーンはすぐに乱雑になり、管理できなくなりました。

HKUによって開発されたRAG-Anythingは、それを変えます。LightRAG上に構築されたRAG-Anythingは、多様なタイプのコンテンツを並行して解析し、統一されたナレッジグラフにマッピングできるAll-in-Oneプラットフォームを提供する。しかし、パイプラインの統一は物語の半分に過ぎない。様々なモダリティの証拠を検索するには、一度に多くの埋め込みタイプを処理できる、高速でスケーラブルなベクトル検索が必要です。そこでMilvusの出番だ。オープンソースの高性能ベクトルデータベースであるMilvusは、複数のストレージや検索ソリューションを必要としません。Milvusは、大規模なANN検索、ハイブリッドベクターキーワード検索、メタデータフィルタリング、柔軟なエンベッディング管理をすべて一箇所でサポートします。

この記事では、RAG-AnythingとMilvusがどのように連携し、断片化されたマルチモーダルツールチェーンをクリーンで統一されたスタックに置き換えるのか、また、わずかなステップで実用的なマルチモーダルRAG Q&Aシステムを構築する方法をご紹介します。

RAG-Anythingとは何か?

RAG-Anythingは、従来のシステムのテキストのみの壁を破るために設計されたRAGフレームワークです。複数の専門的なツールに依存する代わりに、様々な種類のコンテンツを解析し、処理し、情報を取得できる単一の統一された環境を提供します。

このフレームワークは、テキスト、ダイアグラム、表、数式を含むドキュメントをサポートしており、ユーザーは単一のまとまったインターフェイスを通じて、あらゆるモダリティにまたがるクエリーを行うことができる。このため、学術研究、財務報告、企業の知識管理など、マルチモーダルな資料が一般的な分野で特に有用です。

その中核となるRAG-Anythingは、文書解析→コンテンツ解析→ナレッジグラフ→インテリジェント検索という多段階のマルチモーダルパイプライン上に構築されています。このアーキテクチャは、インテリジェントなオーケストレーションとクロスモーダルな理解を可能にし、システムが単一の統合されたワークフロー内で多様なコンテンツモダリティをシームレスに扱うことを可能にする。

1 + 3 + N」アーキテクチャ

エンジニアリングレベルでは、RAG-Anythingの機能は「1 + 3 + N」アーキテクチャによって実現されています:

コアエンジン

RAG-Anythingの中心には、LightRAGにインスパイアされたナレッジグラフエンジンがあります。このコアユニットは、マルチモーダルなエンティティ抽出、クロスモーダルな関係マッピング、ベクトル化されたセマンティックストレージを担当する。従来のテキストのみのRAGシステムとは異なり、このエンジンはテキスト、画像内の視覚的オブジェクト、テーブルに埋め込まれた関係構造から実体を理解する。

3つのモーダルプロセッサ

RAG-Anythingは、モダリティに特化した深い理解のために設計された3つの特殊なモダリティプロセッサを統合している。これらは一緒になって、システムのマルチモーダル分析レイヤーを形成する。

  • ImageModalProcessorは視覚的コンテンツとその文脈的意味を解釈します。

  • TableModalProcessor は表構造を解析し、データ内の論理的および数値的関係を解読します。

  • EquationModalProcessorは、数学記号や数式の背後にあるセマンティクスを理解します。

Nパーサー

実世界のドキュメントの多様な構造をサポートするために、RAG-Anythingは複数の抽出エンジン上に構築された拡張可能な構文解析レイヤーを提供します。現在、MinerUとDoclingの両方を統合し、文書の種類と構造の複雑さに基づいて最適なパーサーを自動的に選択します。

1 + 3 + N "アーキテクチャに基づき、RAG-Anythingは、異なるコンテンツタイプを処理する方法を変更することで、従来のRAGパイプラインを改善します。テキスト、画像、表を1つずつ処理する代わりに、システムはそれらをすべて一度に処理する。

# The core configuration demonstrates the parallel processing design
config = RAGAnythingConfig(
    working_dir="./rag_storage",
    parser="mineru",
    parse_method="auto",  # Automatically selects the optimal parsing strategy
    enable_image_processing=True,
    enable_table_processing=True, 
    enable_equation_processing=True,
    max_workers=8  # Supports multi-threaded parallel processing
)

この設計により、大規模な技術文書の処理が大幅にスピードアップします。ベンチマークテストによると、システムがより多くのCPUコアを使用すると、処理速度が著しく向上し、各文書の処理に必要な時間が大幅に短縮されます。

レイヤー化された保存と検索の最適化

マルチモーダル設計に加え、RAG-Anythingは、結果をより正確かつ効率的にするために、階層的な保存と検索のアプローチも採用しています。

  • テキストは従来のベクターデータベースに保存されます。

  • 画像は別の視覚的特徴ストアで管理される。

  • テーブルは構造化データ・ストレージに保管される。

  • 数式は意味ベクトルに変換される。

各コンテンツタイプをそれぞれに適したフォーマットで保存することで、システムは単一の一般的な類似検索に頼るのではなく、各モダリティに最適な検索方法を選択することができる。これにより、様々な種類のコンテンツにおいて、より高速で信頼性の高い検索結果を得ることができる。

MilvusのRAG-Anythingへの適合性

RAG-Anythingは強力なマルチモーダル検索を提供しますが、これをうまく行うには、あらゆる種類の埋め込みを横断する迅速でスケーラブルなベクトル検索が必要です。Milvusはこの役割を完璧に果たす。

クラウドネイティブなアーキテクチャと計算機とストレージの分離により、Milvusは高いスケーラビリティとコスト効率の両方を実現する。Milvusは、読み書き分離とストリーム・バッチの統一をサポートしているため、新しいデータが挿入されるとすぐに検索可能になるというリアルタイムのクエリ性能を維持しながら、高同時処理のワークロードを処理することができます。

Milvusはまた、分散型のフォールトトレラント設計により、個々のノードに障害が発生した場合でもシステムを安定させ、エンタープライズグレードの信頼性を確保します。このため、本番レベルのマルチモーダルRAGの導入に適している。

RAG-AnythingとmilvusによるマルチモーダルQ&Aシステムの構築方法

このデモは、RAG-Anythingフレームワーク、Milvusベクトルデータベース、TongYiエンベッディングモデルを使用して、マルチモーダルQ&Aシステムを構築する方法を示しています。(この例はコア実装コードに焦点を当てており、完全な本番セットアップではありません)

ハンズオンデモ

前提条件

  • Python:3.10以上

  • ベクターデータベースMilvusサービス(Milvus Lite)

  • クラウドサービスAlibaba Cloud APIキー(LLMおよびエンベッディングサービス用)

  • LLMモデル: qwen-vl-max (ビジョン対応モデル)

エンベッディングモデルtongyi-embedding-vision-plus

- python -m venv .venv && source .venv/bin/activate  # For Windows users:  .venvScriptsactivate
- pip install -r requirements-min.txt
- cp .env.example .env #add DASHSCOPE_API_KEY

最小限の動作例を実行します:

python minimal_[main.py](<http://main.py>)

期待される出力:

スクリプトが正常に実行されると、ターミナルに表示されます:

  • LLMによって生成されたテキストベースのQ&A結果。

  • クエリに対応する検索された画像の説明。

プロジェクトの構造

.
├─ requirements-min.txt
├─ .env.example
├─ [config.py](<http://config.py>)
├─ milvus_[store.py](<http://store.py>)
├─ [adapters.py](<http://adapters.py>)
├─ minimal_[main.py](<http://main.py>)
└─ sample
   ├─ docs
   │  └─ faq_milvus.txt
   └─ images
      └─ milvus_arch.png

プロジェクトの依存関係

raganything
lightrag
pymilvus[lite]>=2.3.0
aiohttp>=3.8.0
orjson>=3.8.0
python-dotenv>=1.0.0
Pillow>=9.0.0
numpy>=1.21.0,<2.0.0
rich>=12.0.0

環境変数

# Alibaba Cloud DashScope
DASHSCOPE_API_KEY=your_api_key_here
# If the endpoint changes in future releases, please update it accordingly.
ALIYUN_LLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_VLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_EMBED_URL=https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding
# Model names (configure all models here for consistency)
LLM_TEXT_MODEL=qwen-max
LLM_VLM_MODEL=qwen-vl-max
EMBED_MODEL=tongyi-embedding-vision-plus
# Milvus Lite
MILVUS_URI=milvus_lite.db
MILVUS_COLLECTION=rag_multimodal_collection
EMBED_DIM=1152

構成

import os
from dotenv import load_dotenv
load_dotenv()
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
LLM_TEXT_MODEL = os.getenv("LLM_TEXT_MODEL", "qwen-max")
LLM_VLM_MODEL = os.getenv("LLM_VLM_MODEL", "qwen-vl-max")
EMBED_MODEL = os.getenv("EMBED_MODEL", "tongyi-embedding-vision-plus")
ALIYUN_LLM_URL = os.getenv("ALIYUN_LLM_URL")
ALIYUN_VLM_URL = os.getenv("ALIYUN_VLM_URL")
ALIYUN_EMBED_URL = os.getenv("ALIYUN_EMBED_URL")
MILVUS_URI = os.getenv("MILVUS_URI", "milvus_lite.db")
MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION", "rag_multimodal_collection")
EMBED_DIM = int(os.getenv("EMBED_DIM", "1152"))
# Basic runtime parameters
TIMEOUT = 60
MAX_RETRIES = 2

モデル呼び出し

import os
import base64
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from config import (
    DASHSCOPE_API_KEY, LLM_TEXT_MODEL, LLM_VLM_MODEL, EMBED_MODEL,
    ALIYUN_LLM_URL, ALIYUN_VLM_URL, ALIYUN_EMBED_URL, EMBED_DIM, TIMEOUT
)
HEADERS = {
    "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
    "Content-Type": "application/json",
}
class AliyunLLMAdapter:
    def __init__(self):
        self.text_url = ALIYUN_LLM_URL
        self.vlm_url = ALIYUN_VLM_URL
        self.text_model = LLM_TEXT_MODEL
        self.vlm_model = LLM_VLM_MODEL
    async def chat(self, prompt: str) -> str:
        payload = {
            "model": self.text_model,
            "input": {"messages": [{"role": "user", "content": prompt}]},
            "parameters": {"max_tokens": 1024, "temperature": 0.5},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.text_url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["choices"][0]["message"]["content"]
    async def chat_vlm_with_image(self, prompt: str, image_path: str) -> str:
        with open(image_path, "rb") as f:
            image_b64 = base64.b64encode([f.read](<http://f.read>)()).decode("utf-8")
        payload = {
            "model": self.vlm_model,
            "input": {"messages": [{"role": "user", "content": [
                {"text": prompt},
                {"image": f"data:image/png;base64,{image_b64}"}
            ]}]},
            "parameters": {"max_tokens": 1024, "temperature": 0.2},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.vlm_url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["choices"][0]["message"]["content"]
class AliyunEmbeddingAdapter:
    def __init__(self):
        self.url = ALIYUN_EMBED_URL
        self.model = EMBED_MODEL
        self.dim = EMBED_DIM
    async def embed_text(self, text: str) -> List[float]:
        payload = {
            "model": self.model,
            "input": {"texts": [text]},
            "parameters": {"text_type": "query", "dimensions": self.dim},
        }
        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
            async with [s.post](<http://s.post>)(self.url, json=payload, headers=HEADERS) as r:
                r.raise_for_status()
                data = await r.json()
                return data["output"]["embeddings"][0]["embedding"]

Milvus Liteとの統合

import json
import time
from typing import List, Dict, Any, Optional
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
from config import MILVUS_URI, MILVUS_COLLECTION, EMBED_DIM
class MilvusVectorStore:
    def __init__(self, uri: str = MILVUS_URI, collection_name: str = MILVUS_COLLECTION, dim: int = EMBED_DIM):
        self.uri = uri
        self.collection_name = collection_name
        self.dim = dim
        self.collection: Optional[Collection] = None
        self._connect_and_prepare()
    def _connect_and_prepare(self):
        connections.connect("default", uri=self.uri)
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=512, is_primary=True),
                FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.dim),
                FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
                FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=32),
                FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024),
                FieldSchema(name="ts", dtype=[DataType.INT](<http://DataType.INT>)64),
            ]
            schema = CollectionSchema(fields, "Minimal multimodal collection")
            self.collection = Collection(self.collection_name, schema)
            self.collection.create_index("vector", {
                "metric_type": "COSINE",
                "index_type": "IVF_FLAT",
                "params": {"nlist": 1024}
            })
        self.collection.load()
    def upsert(self, ids: List[str], vectors: List[List[float]], contents: List[str],
               content_types: List[str], sources: List[str]) -> None:
        data = [
            ids,
            vectors,
            contents,
            content_types,
            sources,
            [int(time.time() * 1000)] * len(ids)
        ]
        self.collection.upsert(data)
        self.collection.flush()
    def search(self, query_vectors: List[List[float]], top_k: int = 5, content_type: Optional[str] = None):
        expr = f'content_type == "{content_type}"' if content_type else None
        params = {"metric_type": "COSINE", "params": {"nprobe": 16}}
        results = [self.collection.search](<http://self.collection.search>)(
            data=query_vectors,
            anns_field="vector",
            param=params,
            limit=top_k,
            expr=expr,
            output_fields=["id", "content", "content_type", "source", "ts"]
        )
        out = []
        for hits in results:
            out.append([{
                "id": h.entity.get("id"),
                "content": h.entity.get("content"),
                "content_type": h.entity.get("content_type"),
                "source": h.entity.get("source"),
                "score": h.score
            } for h in hits])
        return out

メインエントリーポイント

"""
Minimal Working Example:
- Insert a short text FAQ into LightRAG (text retrieval context)
- Insert an image description vector into Milvus (image retrieval context)
- Execute two example queries: one text QA and one image-based QA
"""
import asyncio
import uuid
from pathlib import Path
from rich import print
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
from adapters import AliyunLLMAdapter, AliyunEmbeddingAdapter
from milvus_store import MilvusVectorStore
from config import EMBED_DIM
SAMPLE_DOC = Path("sample/docs/faq_milvus.txt")
SAMPLE_IMG = Path("sample/images/milvus_arch.png")
async def main():
    # 1) Initialize core components
    llm = AliyunLLMAdapter()
    emb = AliyunEmbeddingAdapter()
    store = MilvusVectorStore()
    # 2) Initialize LightRAG (for text-only retrieval)
    async def llm_complete(prompt: str, max_tokens: int = 1024) -> str:
        return await [llm.chat](<http://llm.chat>)(prompt)
    async def embed_func(text: str) -> list:
        return await emb.embed_text(text)
    rag = LightRAG(
        working_dir="rag_workdir_min",
        llm_model_func=llm_complete,
        embedding_func=EmbeddingFunc(
            embedding_dim=EMBED_DIM,
            max_token_size=8192,
            func=embed_func
        ),
    )
    # 3) Insert text data
    if SAMPLE_DOC.exists():
        text = SAMPLE_[DOC.read](<http://DOC.read>)_text(encoding="utf-8")
        await rag.ainsert(text)
        print("[green]Inserted FAQ text into LightRAG[/green]")
    else:
        print("[yellow] sample/docs/faq_milvus.txt not found[/yellow]")
    # 4) Insert image data (store description in Milvus)
    if SAMPLE_IMG.exists():
        # Use the VLM to generate a description as its semantic content
        desc = await [llm.chat](<http://llm.chat>)_vlm_with_image("Please briefly describe the key components of the Milvus architecture shown in the image.", str(SAMPLE_IMG))
        vec = await emb.embed_text(desc)  # Use text embeddings to maintain a consistent vector dimension, simplifying reuse
        store.upsert(
            ids=[str(uuid.uuid4())],
            vectors=[vec],
            contents=[desc],
            content_types=["image"],
            sources=[str(SAMPLE_IMG)]
        )
        print("[green]Inserted image description into Milvus(content_type=image)[/green]")
    else:
        print("[yellow] sample/images/milvus_arch.png not found[/yellow]")
    # 5) Query: Text-based QA (from LightRAG)
    q1 = "Does Milvus support simultaneous insertion and search? Give a short answer."
    ans1 = await rag.aquery(q1, param=QueryParam(mode="hybrid"))
    print("\\n[bold]Text QA[/bold]")
    print(ans1)
    # 6) Query: Image-related QA (from Milvus)
    q2 = "What are the key components of the Milvus architecture?"
    q2_vec = await emb.embed_text(q2)
    img_hits = [store.search](<http://store.search>)([q2_vec], top_k=3, content_type="image")
    print("\\n[bold]Image Retrieval (returns semantic image descriptions)[/bold]")
    print(img_hits[0] if img_hits else [])
if __name__ == "__main__":
    [asyncio.run](<http://asyncio.run>)(main())

これで、マルチモーダルRAGシステムを独自のデータセットでテストすることができます。

マルチモーダルRAGの未来

より多くの実世界のデータがプレーンテキストを超えるにつれて、RAG(Retrieval-Augmented Generation)システムは真のマルチモダリティに向けて進化し始めています。RAG-Anythingのようなソリューションはすでに、テキスト、画像、表、数式、その他の構造化コンテンツを統一的な方法で処理できることを示している。今後、3つの主要なトレンドが、マルチモーダルRAGの次の段階を形成すると思います:

より多くのモダリティへの拡張

RAG-Anythingのような現在のフレームワークは、すでにテキスト、画像、表、数式を扱うことができる。次のフロンティアは、ビデオ、オーディオ、センサーデータ、3Dモデルなど、さらにリッチなコンテンツタイプをサポートすることで、RAGシステムが現代のあらゆるデータから情報を理解し、取得できるようにすることです。

リアルタイムのデータ更新

今日、ほとんどのRAGパイプラインは、比較的静的なデータソースに依存しています。情報がより急速に変化するにつれて、将来のシステムでは、リアルタイムの文書更新、ストリーミング取り込み、インクリメンタルなインデックス作成が必要になります。このシフトにより、RAGはダイナミックな環境において、より応答性が高く、タイムリーで、信頼性の高いものになります。

RAGのエッジデバイスへの移行

Milvus Liteのような軽量ベクターツールにより、マルチモーダルRAGはもはやクラウドに限定されるものではない。エッジデバイスやIoTシステムにRAGを導入することで、インテリジェントな検索をデータが生成される場所で行うことができ、レイテンシ、プライバシー、全体的な効率が向上します。

👉 マルチモーダルRAGを探求する準備はできましたか?

お客様のマルチモーダルパイプラインとMilvusを組み合わせて、テキスト、画像、その他にまたがる高速でスケーラブルな検索をお試しください。

ご質問がある場合、または機能の詳細を知りたい場合は、Discordチャンネルにご参加ください。私たちの Discordチャンネルに参加するか、 GitHubに課題を提出してください。また、 Milvusオフィスアワーを通して、20分間の1対1のセッションを予約し、洞察、ガイダンス、質問への回答を得ることもできます。

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    続けて読む