多模式 RAG 简化版:RAG-Anything + Milvus,取代 20 种独立工具
过去,构建多模态 RAG 系统意味着要拼接十几个专用工具--一个用于 OCR,一个用于表格,一个用于数学公式,一个用于 Embeddings,一个用于搜索,等等。传统的 RAG 管道是专为文本设计的,一旦文档开始包含图像、表格、公式、图表和其他结构化内容,工具链很快就会变得杂乱无章,难以管理。
香港大学开发的RAG-Anything 改变了这一局面。它以 LightRAG 为基础,提供了一个一体化平台,可以并行解析不同的内容类型,并将它们映射到统一的知识图谱中。但统一管道只是成功的一半。要在这些不同的模式中检索证据,你仍然需要一个快速、可扩展的向量搜索,它可以同时处理许多 Embeddings 类型。这就是Milvus的用武之地。作为一个开源的高性能向量数据库,Milvus 无需使用多种存储和搜索解决方案。它支持大规模 ANN 搜索、混合向量-关键字检索、元数据过滤和灵活的嵌入管理--所有这些都集中在一个地方。
在这篇文章中,我们将详细介绍 RAG-Anything 和 Milvus 如何协同工作,以简洁、统一的堆栈取代零散的多模态工具链,并展示如何只需几个步骤就能构建实用的多模态 RAG 问答系统。
什么是 RAG-Anything 及其工作原理
RAG-Anything是一个 RAG 框架,旨在打破传统系统的纯文本障碍。它不依赖于多种专用工具,而是提供了一个单一、统一的环境,可以解析、处理和检索混合内容类型的信息。
该框架支持包含文本、图表、表格和数学表达式的文档,使用户能够通过一个统一的界面跨所有模式进行查询。这使得它在学术研究、财务报告和企业知识管理等领域特别有用,因为在这些领域,多模态材料很常见。
RAG-Anything 的核心是一个多阶段多模态管道:文档解析→内容分析→知识图谱→智能检索。这种架构实现了智能协调和跨模态理解,使系统能够在单一集成工作流程中无缝处理不同的内容模态。
1 + 3 + N "架构
在工程层面,RAG-Anything 的功能是通过其 "1 + 3 + N "架构实现的:
核心引擎
RAG-Anything 的核心是一个知识图引擎,其灵感来自LightRAG。这个核心单元负责多模态实体提取、跨模态关系映射和向量语义存储。与传统的纯文本 RAG 系统不同,该引擎能理解文本中的实体、图像中的视觉对象以及嵌入表格中的关系结构。
3 个模态处理器
RAG-Anything 集成了三个专门的模态处理器,用于深入理解特定模态。它们共同构成了系统的多模态分析层。
图像模态处理器(ImageModalProcessor)用于解释视觉内容及其上下文含义。
TableModalProcessor 可解析表格结构,解码数据中的逻辑和数字关系。
EquationModalProcessor可理解数学符号和公式背后的语义。
N 解析器
为了支持真实世界文档的各种结构,RAG-Anything 提供了一个基于多个提取引擎的可扩展解析层。目前,它集成了 MinerU 和 Docling,可根据文档类型和结构复杂性自动选择最佳解析器。
RAG-Anything 以 "1+3+N "架构为基础,通过改变不同内容类型的处理方式,改进了传统的 RAG 管道。系统不再逐一处理文本、图片和表格,而是一次性处理所有这些内容。
# The core configuration demonstrates the parallel processing design
config = RAGAnythingConfig(
working_dir="./rag_storage",
parser="mineru",
parse_method="auto", # Automatically selects the optimal parsing strategy
enable_image_processing=True,
enable_table_processing=True,
enable_equation_processing=True,
max_workers=8 # Supports multi-threaded parallel processing
)
这种设计大大加快了处理大型技术文档的速度。基准测试表明,当系统使用更多的 CPU 内核时,速度会明显加快,从而大幅缩短处理每份文档所需的时间。
分层存储和检索优化
除了多模式设计,RAG-Anything 还采用了分层存储和检索方法,使结果更加准确和高效。
文本存储在传统的向量数据库中。
图像存储在单独的可视化特征存储中。
表格保存在结构化数据存储中。
数学公式被转化为语义向量。
通过将每种内容类型存储在各自合适的格式中,系统可以为每种方式选择最佳检索方法,而不是依赖单一、通用的相似性搜索。这样,不同类型的内容都能获得更快、更可靠的结果。
Milvus 如何融入 RAG-Anything
RAG-Anything 提供了强大的多模态检索功能,但要做好这一点,需要在所有类型的 Embeddings 中进行快速、可扩展的向量搜索。Milvus完美地扮演了这一角色。
凭借其云原生架构和计算存储分离,Milvus 可同时提供高扩展性和成本效益。它支持读写分离和流批量统一,使系统能够处理高并发工作负载,同时保持实时查询性能--新数据插入后可立即搜索。
Milvus 还通过分布式容错设计确保企业级可靠性,即使单个节点出现故障,系统也能保持稳定。这使其非常适合生产级多模式 RAG 部署。
如何使用 RAG-Anything 和 Milvus 构建多模态问答系统
本演示展示了如何使用 RAG-Anything 框架、Milvus 向量数据库和同义嵌入模型构建多模态问答系统。(本示例侧重于核心实施代码,并非完整的生产设置)。
上机演示
前提条件: Python
Python3.10 或更高版本
向量数据库:Milvus 服务(Milvus Lite)
云服务:阿里云 API 密钥(用于 LLM 和 Embeddings 服务)
LLM 模型:
qwen-vl-max(支持视觉的模型)
Embedding 模型:tongyi-embedding-vision-plus
- python -m venv .venv && source .venv/bin/activate # For Windows users: .venvScriptsactivate
- pip install -r requirements-min.txt
- cp .env.example .env #add DASHSCOPE_API_KEY
执行最小工作示例:
python minimal_[main.py](<http://main.py>)
预期输出:
脚本成功运行后,终端应显示
由 LLM 生成的基于文本的问答结果。
检索到的与查询相对应的图像描述。
项目结构
.
├─ requirements-min.txt
├─ .env.example
├─ [config.py](<http://config.py>)
├─ milvus_[store.py](<http://store.py>)
├─ [adapters.py](<http://adapters.py>)
├─ minimal_[main.py](<http://main.py>)
└─ sample
├─ docs
│ └─ faq_milvus.txt
└─ images
└─ milvus_arch.png
项目依赖关系
raganything
lightrag
pymilvus[lite]>=2.3.0
aiohttp>=3.8.0
orjson>=3.8.0
python-dotenv>=1.0.0
Pillow>=9.0.0
numpy>=1.21.0,<2.0.0
rich>=12.0.0
环境变量
# Alibaba Cloud DashScope
DASHSCOPE_API_KEY=your_api_key_here
# If the endpoint changes in future releases, please update it accordingly.
ALIYUN_LLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_VLM_URL=https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions
ALIYUN_EMBED_URL=https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding
# Model names (configure all models here for consistency)
LLM_TEXT_MODEL=qwen-max
LLM_VLM_MODEL=qwen-vl-max
EMBED_MODEL=tongyi-embedding-vision-plus
# Milvus Lite
MILVUS_URI=milvus_lite.db
MILVUS_COLLECTION=rag_multimodal_collection
EMBED_DIM=1152
配置
import os
from dotenv import load_dotenv
load_dotenv()
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
LLM_TEXT_MODEL = os.getenv("LLM_TEXT_MODEL", "qwen-max")
LLM_VLM_MODEL = os.getenv("LLM_VLM_MODEL", "qwen-vl-max")
EMBED_MODEL = os.getenv("EMBED_MODEL", "tongyi-embedding-vision-plus")
ALIYUN_LLM_URL = os.getenv("ALIYUN_LLM_URL")
ALIYUN_VLM_URL = os.getenv("ALIYUN_VLM_URL")
ALIYUN_EMBED_URL = os.getenv("ALIYUN_EMBED_URL")
MILVUS_URI = os.getenv("MILVUS_URI", "milvus_lite.db")
MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION", "rag_multimodal_collection")
EMBED_DIM = int(os.getenv("EMBED_DIM", "1152"))
# Basic runtime parameters
TIMEOUT = 60
MAX_RETRIES = 2
模型调用
import os
import base64
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from config import (
DASHSCOPE_API_KEY, LLM_TEXT_MODEL, LLM_VLM_MODEL, EMBED_MODEL,
ALIYUN_LLM_URL, ALIYUN_VLM_URL, ALIYUN_EMBED_URL, EMBED_DIM, TIMEOUT
)
HEADERS = {
"Authorization": f"Bearer {DASHSCOPE_API_KEY}",
"Content-Type": "application/json",
}
class AliyunLLMAdapter:
def __init__(self):
self.text_url = ALIYUN_LLM_URL
self.vlm_url = ALIYUN_VLM_URL
self.text_model = LLM_TEXT_MODEL
self.vlm_model = LLM_VLM_MODEL
async def chat(self, prompt: str) -> str:
payload = {
"model": self.text_model,
"input": {"messages": [{"role": "user", "content": prompt}]},
"parameters": {"max_tokens": 1024, "temperature": 0.5},
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
async with [s.post](<http://s.post>)(self.text_url, json=payload, headers=HEADERS) as r:
r.raise_for_status()
data = await r.json()
return data["output"]["choices"][0]["message"]["content"]
async def chat_vlm_with_image(self, prompt: str, image_path: str) -> str:
with open(image_path, "rb") as f:
image_b64 = base64.b64encode([f.read](<http://f.read>)()).decode("utf-8")
payload = {
"model": self.vlm_model,
"input": {"messages": [{"role": "user", "content": [
{"text": prompt},
{"image": f"data:image/png;base64,{image_b64}"}
]}]},
"parameters": {"max_tokens": 1024, "temperature": 0.2},
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
async with [s.post](<http://s.post>)(self.vlm_url, json=payload, headers=HEADERS) as r:
r.raise_for_status()
data = await r.json()
return data["output"]["choices"][0]["message"]["content"]
class AliyunEmbeddingAdapter:
def __init__(self):
self.url = ALIYUN_EMBED_URL
self.model = EMBED_MODEL
self.dim = EMBED_DIM
async def embed_text(self, text: str) -> List[float]:
payload = {
"model": self.model,
"input": {"texts": [text]},
"parameters": {"text_type": "query", "dimensions": self.dim},
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as s:
async with [s.post](<http://s.post>)(self.url, json=payload, headers=HEADERS) as r:
r.raise_for_status()
data = await r.json()
return data["output"]["embeddings"][0]["embedding"]
Milvus Lite 集成
import json
import time
from typing import List, Dict, Any, Optional
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
from config import MILVUS_URI, MILVUS_COLLECTION, EMBED_DIM
class MilvusVectorStore:
def __init__(self, uri: str = MILVUS_URI, collection_name: str = MILVUS_COLLECTION, dim: int = EMBED_DIM):
self.uri = uri
self.collection_name = collection_name
self.dim = dim
self.collection: Optional[Collection] = None
self._connect_and_prepare()
def _connect_and_prepare(self):
connections.connect("default", uri=self.uri)
if utility.has_collection(self.collection_name):
self.collection = Collection(self.collection_name)
else:
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=512, is_primary=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.dim),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=32),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024),
FieldSchema(name="ts", dtype=[DataType.INT](<http://DataType.INT>)64),
]
schema = CollectionSchema(fields, "Minimal multimodal collection")
self.collection = Collection(self.collection_name, schema)
self.collection.create_index("vector", {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
})
self.collection.load()
def upsert(self, ids: List[str], vectors: List[List[float]], contents: List[str],
content_types: List[str], sources: List[str]) -> None:
data = [
ids,
vectors,
contents,
content_types,
sources,
[int(time.time() * 1000)] * len(ids)
]
self.collection.upsert(data)
self.collection.flush()
def search(self, query_vectors: List[List[float]], top_k: int = 5, content_type: Optional[str] = None):
expr = f'content_type == "{content_type}"' if content_type else None
params = {"metric_type": "COSINE", "params": {"nprobe": 16}}
results = [self.collection.search](<http://self.collection.search>)(
data=query_vectors,
anns_field="vector",
param=params,
limit=top_k,
expr=expr,
output_fields=["id", "content", "content_type", "source", "ts"]
)
out = []
for hits in results:
out.append([{
"id": h.entity.get("id"),
"content": h.entity.get("content"),
"content_type": h.entity.get("content_type"),
"source": h.entity.get("source"),
"score": h.score
} for h in hits])
return out
主入口点
"""
Minimal Working Example:
- Insert a short text FAQ into LightRAG (text retrieval context)
- Insert an image description vector into Milvus (image retrieval context)
- Execute two example queries: one text QA and one image-based QA
"""
import asyncio
import uuid
from pathlib import Path
from rich import print
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
from adapters import AliyunLLMAdapter, AliyunEmbeddingAdapter
from milvus_store import MilvusVectorStore
from config import EMBED_DIM
SAMPLE_DOC = Path("sample/docs/faq_milvus.txt")
SAMPLE_IMG = Path("sample/images/milvus_arch.png")
async def main():
# 1) Initialize core components
llm = AliyunLLMAdapter()
emb = AliyunEmbeddingAdapter()
store = MilvusVectorStore()
# 2) Initialize LightRAG (for text-only retrieval)
async def llm_complete(prompt: str, max_tokens: int = 1024) -> str:
return await [llm.chat](<http://llm.chat>)(prompt)
async def embed_func(text: str) -> list:
return await emb.embed_text(text)
rag = LightRAG(
working_dir="rag_workdir_min",
llm_model_func=llm_complete,
embedding_func=EmbeddingFunc(
embedding_dim=EMBED_DIM,
max_token_size=8192,
func=embed_func
),
)
# 3) Insert text data
if SAMPLE_DOC.exists():
text = SAMPLE_[DOC.read](<http://DOC.read>)_text(encoding="utf-8")
await rag.ainsert(text)
print("[green]Inserted FAQ text into LightRAG[/green]")
else:
print("[yellow] sample/docs/faq_milvus.txt not found[/yellow]")
# 4) Insert image data (store description in Milvus)
if SAMPLE_IMG.exists():
# Use the VLM to generate a description as its semantic content
desc = await [llm.chat](<http://llm.chat>)_vlm_with_image("Please briefly describe the key components of the Milvus architecture shown in the image.", str(SAMPLE_IMG))
vec = await emb.embed_text(desc) # Use text embeddings to maintain a consistent vector dimension, simplifying reuse
store.upsert(
ids=[str(uuid.uuid4())],
vectors=[vec],
contents=[desc],
content_types=["image"],
sources=[str(SAMPLE_IMG)]
)
print("[green]Inserted image description into Milvus(content_type=image)[/green]")
else:
print("[yellow] sample/images/milvus_arch.png not found[/yellow]")
# 5) Query: Text-based QA (from LightRAG)
q1 = "Does Milvus support simultaneous insertion and search? Give a short answer."
ans1 = await rag.aquery(q1, param=QueryParam(mode="hybrid"))
print("\\n[bold]Text QA[/bold]")
print(ans1)
# 6) Query: Image-related QA (from Milvus)
q2 = "What are the key components of the Milvus architecture?"
q2_vec = await emb.embed_text(q2)
img_hits = [store.search](<http://store.search>)([q2_vec], top_k=3, content_type="image")
print("\\n[bold]Image Retrieval (returns semantic image descriptions)[/bold]")
print(img_hits[0] if img_hits else [])
if __name__ == "__main__":
[asyncio.run](<http://asyncio.run>)(main())
现在,您可以用自己的数据集测试多模态 RAG 系统了。
多模态 RAG 的未来
随着越来越多的现实世界数据超越纯文本,检索增强生成(RAG)系统开始向真正的多模态发展。RAG-Anything等解决方案已经展示了如何以统一的方式处理文本、图像、表格、公式和其他结构化内容。展望未来,我认为多模态 RAG 的下一阶段将呈现三大趋势:
扩展到更多模式
当前的框架(如 RAG-Anything)已经可以处理文本、图像、表格和数学表达式。下一个前沿领域是支持更丰富的内容类型,包括视频、音频、传感器数据和三维模型,使 RAG 系统能够理解和检索全部现代数据信息。
实时数据更新
目前,大多数 RAG 管道都依赖于相对静态的数据源。随着信息变化越来越快,未来的系统将需要实时文档更新、流式摄取和增量索引。这一转变将使 RAG 在动态环境中反应更迅速、更及时、更可靠。
将 RAG 移至边缘设备
有了Milvus Lite 等轻量级向量工具,多模式 RAG 不再局限于云端。在边缘设备和物联网系统上部署 RAG,可以在更接近数据生成的地方进行智能检索,从而改善延迟、隐私和整体效率。
准备探索多模态 RAG 吗?
尝试将您的多模态管道与Milvus配对,体验跨文本、图像等的快速、可扩展检索。
有问题或想深入了解任何功能?加入我们的 Discord 频道或在 GitHub 上提交问题。您还可以通过 Milvus Office Hours 预订 20 分钟的一对一课程,以获得见解、指导和问题解答。
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word



