使用 CRAG、LangGraph 和 Milvus 修正 RAG 檢索錯誤
隨著 LLM 應用程式投入生產,團隊越來越需要他們的模型來回答以私人資料或即時資訊為基礎的問題。檢索增量生成 (Retrieval-augmented generation, RAG) - 即模型在查詢時從外部知識庫中提取 - 是標準的方法。它可以減少幻覺,並保持答案的即時性。
但在實際應用中,有個問題很快就浮現出來:文件的相似度可能很高,但對於問題來說卻是完全錯誤的。傳統的 RAG 管道將相似度等同於相關性。在生產中,這個假設就會被打破。排名最前的結果可能已經過時、只與問題有切身關係,或是完全遺漏了使用者所需的細節。
CRAG (Corrective Retrieval-Augmented Generation) 透過在擷取與產生之間加入評估與修正來解決這個問題。系統不會盲目相信相似度評分,而是會檢查擷取的內容是否真的回答了問題,如果沒有回答問題,系統就會修正。
這篇文章將介紹如何使用 LangChain、LangGraph 和Milvus 來建立一個生產就緒的 CRAG 系統。
傳統 RAG 無法解決的三個檢索問題
大多數 RAG 在生產上的失敗都可追溯到以下三個問題之一:
檢索不匹配。該文件在主題上類似,但實際上沒有回答問題。如果詢問如何在 Nginx 中設定 HTTPS 證書,系統可能會回傳 Apache 設定指南、2019 解說或 TLS 工作原理的一般說明。語義上接近,實際上無用。
陳舊的內容。 矢量搜尋沒有陳舊的概念。查詢「Python async 最佳實務」,您會得到 2018 年模式和 2024 年模式的混合,純粹以嵌入距離排序。系統無法區分使用者真正需要的是哪一個。
記憶體污染。這個問題會隨著時間複雜化,通常也是最難解決的問題。假設系統擷取過期的 API 參考資料,並產生不正確的程式碼。該錯誤的輸出會儲存在記憶體中。在下一次類似的查詢中,系統會再次擷取它,強化錯誤。陳舊的資訊和新鮮的資訊逐漸混合,系統的可靠性隨著每個週期而降低。
這些都不是小問題。一旦 RAG 系統處理真正的流量,它們就會經常出現。這就是為什麼檢索品質檢查是一種需求,而不是一種可有可无的東西。
什麼是 CRAG?先評估後產生
糾正檢索-增強生成 (CRAG)是一種在 RAG 管道中的檢索和生成之間增加評估和糾正步驟的方法。它是在論文Corrective Retrieval Augmented Generation(Yan 等人,2024 年)中提出的。傳統的 RAG 會做出二元判斷 - 使用該文件或捨棄該文件 - 與此不同的是,RAG 會對每個檢索結果的相關性進行評分,並在它到達語言模型之前,先經過三個修正路徑之一。
當檢索結果處於灰色地帶時,傳統的 RAG 就會陷入困境:部分相關、有些過時或缺少關鍵部分。簡單的 yes/no 閘門不是捨棄有用的部分資訊,就是讓嘈雜的內容通過。CRAG將檢索 → 產生的流程重組為檢索 → 評估 → 修正 → 產生,讓系統有機會在開始產生之前修正檢索品質。
CRAG 四步工作流程:檢索 → 評估 → 修正 → 產生,顯示文件如何被評分和路由
擷取的結果可分為三類:
- 正確:直接回答查詢;輕微精煉後可用
- 模糊:部分相關;需要補充資訊
- 不正確:不相關;捨棄並回到其他來源
| 決定 | 信心 | 行動 |
|---|---|---|
| 正確 | > 0.9 | 完善文件內容 |
| 含糊不清 | 0.5-0.9 | 完善文件內容 + 網路搜尋補充 |
| 不正確 | < 0.5 | 捨棄檢索結果;完全使用網路搜尋 |
內容精煉
CRAG 也解決了標準 RAG 的一個更微妙的問題:大多數系統都會將擷取到的完整文件餵給模型。這會浪費字元並稀釋信號 - 模型必須繞過不相干的段落,才能找到真正重要的一句話。CRAG 會先精煉擷取的內容,擷取相關的部分並刪除其他部分。
原論文使用知識帶和啟發式規則來達到此目的。實際上,關鍵字比對對許多使用個案都有效,而生產系統可以分層進行以 LLM 為基礎的摘要或結構化抽取,以獲得更高的品質。
精煉過程包含三個部分:
- 文件分解:從較長的文件中抽取關鍵段落
- 查詢重寫:將模糊或含糊的查詢轉換為更有針對性的查詢
- 知識選擇:重複、排序並只保留最有用的內容
三步式文件精煉流程:文件分解(2000 → 500 個字元)、查詢重寫(提高搜尋精確度)和知識選擇(過濾、排序和修剪)
評估器
評估器是 CRAG 的核心。它不是用來進行深入推理的,而是一個快速的分流閘門。給定一個查詢和一組擷取的文件,它會決定這些內容是否足以使用。
原始論文選擇了微調的 T5-Large 模型,而非一般用途的 LLM。理由是:對於這項特殊任務而言,速度與精確度比靈活性更重要。
| 屬性 | 微調 T5-Large | GPT-4 |
|---|---|---|
| 延遲 | 10-20 毫秒 | 200 毫秒以上 |
| 精確度 | 92% (紙本實驗) | 待定 |
| 任務契合度 | 高 - 單一任務微調,精確度較高 | 中 - 通用、更靈活但專業性較低 |
網路搜尋後備
當內部檢索被標記為不正確或含糊不清時,CRAG 可以觸發網路搜尋,以取得較新或補充的資訊。對於時間敏感的查詢和內部知識庫有缺口的主題,這可發揮安全網的作用。
為什麼 Milvus 非常適合 CRAG 的生產?
CRAG 的有效性取決於它下面的東西。向量資料庫需要做的不僅僅是基本的相似性搜尋,還需要支援生產 CRAG 系統所要求的多租戶隔離、混合檢索和模式彈性。
在評估了多個選項之後,我們選擇了Milvus,原因有三。
多租客隔離
在以代理為基礎的系統中,每個使用者或會話都需要自己的記憶體空間。天真的做法--每個租戶一個集合--很快就會成為一個令人頭痛的作業問題,特別是在規模較大的情況下。
Milvus 使用Partition Key 來處理這個問題。在agent_id 欄位上設定is_partition_key=True ,Milvus 就會自動將查詢路由到正確的分割區。沒有收集擴張,也沒有手動路由程式碼。
在我們跨 100 個租戶、擁有 1 千萬向量的基準測試中,Milvus 搭配 Clustering Compaction 的QPS較未最佳化的基準高出 3 至 5 倍。
混合式擷取
純向量搜尋在精確匹配的內容-產品 SKU(如SKU-2024-X5 、版本字串或特定術語)方面有不足之處。
Milvus 2.5 原生支援混合式擷取:稠密向量用於語意相似性、稀疏向量用於 BM25 類型的關鍵字比對,以及標量元資料篩選 - 全部都在一次查詢中完成。結果使用 Reciprocal Rank Fusion (RRF) 融合,因此您不需要建立和合併獨立的檢索管道。
在 100 萬向量的資料集上,Milvus Sparse-BM25 的擷取延遲時間為6 毫秒,對端對端 CRAG 效能的影響微乎其微。
適用於不斷演進記憶體的彈性模式
隨著 CRAG 管道的成熟,資料模型也隨之演化。我們需要在迭代評估邏輯的同時,新增confidence 、verified 和source 等欄位。在大多數資料庫中,這意味著遷移腳本和停機時間。
Milvus 支援動態 JSON 欄位,因此可以在不中斷服務的情況下隨時擴充元資料。
以下是一個典型的模式:
fields = [
FieldSchema(name="agent_id", dtype=DataType.VARCHAR, is_partition_key=True), # multi-tenancy
FieldSchema(name="dense_embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), # semantic retrieval
FieldSchema(name="sparse_embedding", dtype=DataType.SPARSE_FLOAT_VECTOR),# BM25
FieldSchema(name="metadata", dtype=DataType.JSON),# dynamic schema
]
# hybrid retrieval + metadata filtering
results = collection.hybrid_search(
reqs=[
AnnSearchRequest(data=[dense_vec], anns_field=“dense_embedding”, limit=20),
AnnSearchRequest(data=[sparse_vec], anns_field=“sparse_embedding”, limit=20)
],
rerank=RRFRanker(),
output_fields=[“metadata”],
expr=‘metadata[“confidence”] > 0.9’,# CRAG confidence filtering
limit=5
)
Milvus 也簡化了部署擴充。它提供Lite、Standalone 和 Distributed 模式,這些模式與程式碼相容,從本機開發轉換到生產叢集只需變更連線字串。
上手操作:使用 LangGraph 中介軟體和 Milvus 建立 CRAG 系統
為什麼使用中間件?
使用 LangGraph 建構 CRAG 的常見方式,是將控制每一步驟的節點和邊緣連接成一個狀態圖。這種方法可行,但隨著複雜度的增加,圖形會變得糾結,調試也變得很頭痛。
我們決定採用 LangGraph 1.0 中的Middleware 模式。它在模型呼叫之前攔截請求,因此擷取、評估和修正都在同一個地方處理。與狀態圖方式相比
- 程式碼更少:邏輯是集中的,而不是分散在圖表節點上
- 更容易遵循:控制流程以線性方式讀取
- 更容易除錯:故障指向單一位置,而非圖形遍歷
核心工作流程
管道分四個步驟執行
- 擷取:從 Milvus擷取前 3 個相關的文件,範圍涵蓋目前的租戶
- 評估:使用輕量級模型評估文件品質
- 修正:根據判定結果進行改進、使用網路搜尋進行補充或完全回退
- 注入:透過動態系統提示將最終完成的上下文傳送至模型
環境設定與資料準備
環境變數
export OPENAI_API_KEY="your-api-key"
export TAVILY_API_KEY="your-tavily-key"
建立 Milvus 套件
在執行程式碼之前,請在 Milvus 中建立一個具有符合擷取邏輯的模式的集合。
# filename: crag_agent.py
# ============ Import dependencies ============
from typing import Literal, List
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware, before_model, dynamic_prompt
from langchain.chat_models import init_chat_model
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_community.tools.tavily_search import TavilySearchResults
# ============ CRAG Middleware (minimal-change version) ============
class CRAGMiddleware(AgentMiddleware):
"""CRAG evaluation and correction middleware (uses official decorator-based hooks to avoid permanently polluting the message stack)“"”
<span class="hljs-keyword">def</span> <span class="hljs-title function_">__init__</span>(<span class="hljs-params">self, vector_store: Milvus, agent_id: <span class="hljs-built_in">str</span></span>):
<span class="hljs-built_in">super</span>().__init__()
<span class="hljs-variable language_">self</span>.vector_store = vector_store
<span class="hljs-variable language_">self</span>.agent_id = agent_id <span class="hljs-comment"># multi-tenant isolation</span>
<span class="hljs-comment"># Lightweight evaluator: used for relevance judgment (can be replaced with the structured version introduced later)</span>
<span class="hljs-variable language_">self</span>.evaluator = init_chat_model(<span class="hljs-string">"openai:gpt-4o-mini"</span>, temperature=<span class="hljs-number">0</span>)
<span class="hljs-comment"># Web search fallback</span>
<span class="hljs-variable language_">self</span>.web_search = TavilySearchResults(max_results=<span class="hljs-number">3</span>)
@before_model
def run_crag(self, state):
“""Run retrieval -> evaluation -> correction before model invocation and prepare the final context""”
# Get the last user message
last_msg = state[“messages”][-1]
query = getattr(last_msg, “content”, “”) if hasattr(last_msg, “content”) else last_msg.get(“content”, “”)
<span class="hljs-comment"># 1. Retrieval: get documents from Milvus (PartitionKey + confidence filtering)</span>
docs = <span class="hljs-variable language_">self</span>._retrieve_from_milvus(query)
<span class="hljs-comment"># 2. Evaluation: three-way decision</span>
verdict = <span class="hljs-variable language_">self</span>._evaluate_relevance(query, docs)
<span class="hljs-comment"># 3. Correction: choose the handling strategy based on the verdict</span>
<span class="hljs-keyword">if</span> verdict == <span class="hljs-string">"incorrect"</span>:
<span class="hljs-comment"># Retrieval failed, rely entirely on Web search</span>
web_results = <span class="hljs-variable language_">self</span>._web_search_fallback(query)
final_context = <span class="hljs-variable language_">self</span>._format_web_results(web_results)
<span class="hljs-keyword">elif</span> verdict == <span class="hljs-string">"ambiguous"</span>:
<span class="hljs-comment"># Retrieval is ambiguous, refine documents + supplement with Web search</span>
refined_docs = <span class="hljs-variable language_">self</span>._refine_documents(docs, query)
web_results = <span class="hljs-variable language_">self</span>._web_search_fallback(query)
final_context = <span class="hljs-variable language_">self</span>._merge_context(refined_docs, web_results)
<span class="hljs-keyword">else</span>:
<span class="hljs-comment"># Retrieval quality is good, only refine the documents</span>
refined_docs = <span class="hljs-variable language_">self</span>._refine_documents(docs, query)
final_context = <span class="hljs-variable language_">self</span>._format_internal_docs(refined_docs)
<span class="hljs-comment"># 4. Put the context into a temporary key, used only for dynamic prompt assembly in the current model call</span>
state[<span class="hljs-string">"_crag_context"</span>] = final_context
<span class="hljs-keyword">return</span> state
@dynamic_prompt
def attach_context(self, state, prompt_messages: List):
“""Inject the CRAG-generated context as a SystemMessage before the prompt for the current model call""”
final_context = state.get(“_crag_context”)
if final_context:
sys_msg = SystemMessage(
content=f"Here is some relevant background information. Please answer the user’s question based on this information:\n\n{final_context}"
)
# Applies only to the current call and is not permanently written into state[“messages”]
prompt_messages = [sys_msg] + prompt_messages
return prompt_messages
<span class="hljs-comment"># ======== Internal methods: retrieval / evaluation / refinement / formatting ========</span>
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_retrieve_from_milvus</span>(<span class="hljs-params">self, query: <span class="hljs-built_in">str</span></span>) -> <span class="hljs-built_in">list</span>:
<span class="hljs-string">"""Retrieve documents from Milvus (Partition Key + confidence filtering)"""</span>
<span class="hljs-keyword">try</span>:
<span class="hljs-comment"># Note: different adapter versions may place filter parameters differently; here expr is passed through search_kwargs</span>
docs = <span class="hljs-variable language_">self</span>.vector_store.similarity_search(
query,
k=<span class="hljs-number">3</span>,
search_kwargs={<span class="hljs-string">"expr"</span>: <span class="hljs-string">f'agent_id == "<span class="hljs-subst">{self.agent_id}</span>"'</span>}
)
<span class="hljs-comment"># Confidence filtering (to avoid low-quality memory contamination)</span>
filtered_docs = [
doc <span class="hljs-keyword">for</span> doc <span class="hljs-keyword">in</span> docs
<span class="hljs-keyword">if</span> (doc.metadata <span class="hljs-keyword">or</span> {}).get(<span class="hljs-string">"confidence"</span>, <span class="hljs-number">0.0</span>) > <span class="hljs-number">0.7</span>
]
<span class="hljs-keyword">return</span> filtered_docs <span class="hljs-keyword">or</span> docs <span class="hljs-comment"># If there are no high-confidence results, fall back to the original results for evaluator judgment</span>
<span class="hljs-keyword">except</span> Exception <span class="hljs-keyword">as</span> e:
<span class="hljs-built_in">print</span>(<span class="hljs-string">f"[CRAG] Retrieval failed: <span class="hljs-subst">{e}</span>"</span>)
<span class="hljs-keyword">return</span> []
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_evaluate_relevance</span>(<span class="hljs-params">self, query: <span class="hljs-built_in">str</span>, docs: <span class="hljs-built_in">list</span></span>) -> <span class="hljs-type">Literal</span>[<span class="hljs-string">"relevant"</span>, <span class="hljs-string">"ambiguous"</span>, <span class="hljs-string">"incorrect"</span>]:
<span class="hljs-string">"""Evaluate document relevance (three-way decision), simplified version: the LLM returns the verdict directly"""</span>
<span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> docs:
<span class="hljs-keyword">return</span> <span class="hljs-string">"incorrect"</span>
<span class="hljs-comment"># Evaluate only the Top-3 documents, taking the first 500 characters of each</span>
doc_content = <span class="hljs-string">"\n\n"</span>.join([
<span class="hljs-string">f"[Document <span class="hljs-subst">{i+<span class="hljs-number">1</span>}</span>] <span class="hljs-subst">{(doc.page_content <span class="hljs-keyword">or</span> <span class="hljs-string">''</span>)[:<span class="hljs-number">500</span>]}</span>..."</span>
<span class="hljs-keyword">for</span> i, doc <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(docs[:<span class="hljs-number">3</span>])
])
prompt = <span class="hljs-string">f"""You are an expert in document relevance evaluation. Assess whether the following documents can answer the query.
Query: {query}
Document content:
{doc_content}
Evaluation criteria:
- relevant: the document directly contains the answer and is highly relevant
- ambiguous: the document is partially relevant and needs external knowledge
- incorrect: the document is irrelevant and cannot answer the query
Return only one word: relevant or ambiguous or incorrect
“"”
try:
result = self.evaluator.invoke(prompt)
verdict = (getattr(result, “content”, “”) or “”).strip().lower()
if verdict not in {“relevant”, “ambiguous”, “incorrect”}:
verdict = “ambiguous”
return verdict
except Exception as e:
print(f"[CRAG] Evaluation failed: {e}")
return “ambiguous”
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_refine_documents</span>(<span class="hljs-params">self, docs: <span class="hljs-built_in">list</span>, query: <span class="hljs-built_in">str</span></span>) -> <span class="hljs-built_in">list</span>:
<span class="hljs-string">"""Refine documents (simplified strips: sentence filtering based on keywords)"""</span>
refined = []
<span class="hljs-comment"># Simple Chinese-period replacement + rough English sentence splitting</span>
keywords = [kw.strip() <span class="hljs-keyword">for</span> kw <span class="hljs-keyword">in</span> query.split() <span class="hljs-keyword">if</span> kw.strip()]
<span class="hljs-keyword">for</span> doc <span class="hljs-keyword">in</span> docs:
text = doc.page_content <span class="hljs-keyword">or</span> <span class="hljs-string">""</span>
sentences = (
text.replace(<span class="hljs-string">"。"</span>, <span class="hljs-string">"。\n"</span>)
.replace(<span class="hljs-string">". "</span>, <span class="hljs-string">".\n"</span>)
.replace(<span class="hljs-string">"! "</span>, <span class="hljs-string">"!\n"</span>)
.replace(<span class="hljs-string">"? "</span>, <span class="hljs-string">"?\n"</span>)
.split(<span class="hljs-string">"\n"</span>)
)
sentences = [s.strip() <span class="hljs-keyword">for</span> s <span class="hljs-keyword">in</span> sentences <span class="hljs-keyword">if</span> s.strip()]
<span class="hljs-comment"># Match any keyword</span>
relevant_sentences = [
s <span class="hljs-keyword">for</span> s <span class="hljs-keyword">in</span> sentences
<span class="hljs-keyword">if</span> <span class="hljs-built_in">any</span>(keyword <span class="hljs-keyword">in</span> s <span class="hljs-keyword">for</span> keyword <span class="hljs-keyword">in</span> keywords)
]
<span class="hljs-keyword">if</span> relevant_sentences:
refined_text = <span class="hljs-string">"。"</span>.join(relevant_sentences[:<span class="hljs-number">3</span>])
refined.append(Document(page_content=refined_text, metadata=doc.metadata <span class="hljs-keyword">or</span> {}))
<span class="hljs-keyword">return</span> refined <span class="hljs-keyword">if</span> refined <span class="hljs-keyword">else</span> docs <span class="hljs-comment"># If nothing is extracted, fall back to the original documents</span>
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_web_search_fallback</span>(<span class="hljs-params">self, query: <span class="hljs-built_in">str</span></span>) -> <span class="hljs-built_in">list</span>:
<span class="hljs-string">"""Web search fallback"""</span>
<span class="hljs-keyword">try</span>:
<span class="hljs-keyword">return</span> <span class="hljs-variable language_">self</span>.web_search.invoke(query) <span class="hljs-keyword">or</span> []
<span class="hljs-keyword">except</span> Exception <span class="hljs-keyword">as</span> e:
<span class="hljs-built_in">print</span>(<span class="hljs-string">f"[CRAG] Web search failed: <span class="hljs-subst">{e}</span>"</span>)
<span class="hljs-keyword">return</span> []
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_merge_context</span>(<span class="hljs-params">self, internal_docs: <span class="hljs-built_in">list</span>, web_results: <span class="hljs-built_in">list</span></span>) -> <span class="hljs-built_in">str</span>:
<span class="hljs-string">"""Merge internal memory and external knowledge into the final context"""</span>
parts = []
<span class="hljs-keyword">if</span> internal_docs:
parts.append(<span class="hljs-string">"[Internal Memory]"</span>)
<span class="hljs-keyword">for</span> i, doc <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(internal_docs, <span class="hljs-number">1</span>):
parts.append(<span class="hljs-string">f"<span class="hljs-subst">{i}</span>. <span class="hljs-subst">{doc.page_content}</span>"</span>)
<span class="hljs-keyword">if</span> web_results:
parts.append(<span class="hljs-string">"[External Knowledge]"</span>)
<span class="hljs-keyword">for</span> i, result <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(web_results, <span class="hljs-number">1</span>):
content = (result <span class="hljs-keyword">or</span> {}).get(<span class="hljs-string">"content"</span>, <span class="hljs-string">""</span>)
url = (result <span class="hljs-keyword">or</span> {}).get(<span class="hljs-string">"url"</span>, <span class="hljs-string">""</span>)
parts.append(<span class="hljs-string">f"<span class="hljs-subst">{i}</span>. <span class="hljs-subst">{content}</span>\n Source: <span class="hljs-subst">{url}</span>"</span>)
<span class="hljs-keyword">return</span> <span class="hljs-string">"\n\n"</span>.join(parts) <span class="hljs-keyword">if</span> parts <span class="hljs-keyword">else</span> <span class="hljs-string">"No relevant information found"</span>
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_format_internal_docs</span>(<span class="hljs-params">self, docs: <span class="hljs-built_in">list</span></span>) -> <span class="hljs-built_in">str</span>:
<span class="hljs-string">"""Format internal documents"""</span>
<span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> docs:
<span class="hljs-keyword">return</span> <span class="hljs-string">"No relevant information found"</span>
parts = [<span class="hljs-string">"[Internal Memory]"</span>]
<span class="hljs-keyword">for</span> i, doc <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(docs, <span class="hljs-number">1</span>):
parts.append(<span class="hljs-string">f"<span class="hljs-subst">{i}</span>. <span class="hljs-subst">{doc.page_content}</span>"</span>)
<span class="hljs-keyword">return</span> <span class="hljs-string">"\n\n"</span>.join(parts)
<span class="hljs-keyword">def</span> <span class="hljs-title function_">_format_web_results</span>(<span class="hljs-params">self, results: <span class="hljs-built_in">list</span></span>) -> <span class="hljs-built_in">str</span>:
<span class="hljs-string">"""Format Web search results"""</span>
<span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> results:
<span class="hljs-keyword">return</span> <span class="hljs-string">"No relevant information found"</span>
parts = [<span class="hljs-string">"[External Knowledge]"</span>]
<span class="hljs-keyword">for</span> i, result <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(results, <span class="hljs-number">1</span>):
content = (result <span class="hljs-keyword">or</span> {}).get(<span class="hljs-string">"content"</span>, <span class="hljs-string">""</span>)
url = (result <span class="hljs-keyword">or</span> {}).get(<span class="hljs-string">"url"</span>, <span class="hljs-string">""</span>)
parts.append(<span class="hljs-string">f"<span class="hljs-subst">{i}</span>. <span class="hljs-subst">{content}</span>\n Source: <span class="hljs-subst">{url}</span>"</span>)
<span class="hljs-keyword">return</span> <span class="hljs-string">"\n\n"</span>.join(parts)
# ============ Initialize the Milvus vector database ============
vector_store = Milvus(
embedding_function=OpenAIEmbeddings(),
connection_args={“host”: “localhost”, “port”: “19530”},
collection_name=“agent_memory”
)
# ============ Create Agent ============
agent = create_agent(
model=“openai:gpt-4o”,
tools=[TavilySearchResults(max_results=3)], # Web search tool
middleware=[
CRAGMiddleware(
vector_store=vector_store,
agent_id=“user_123_session_456” # multi-tenant isolation: each Agent instance uses its own ID
)
]
)
# ============ Example run ============
if name == "main":
# Example query: use HumanMessage to ensure compatibility
response = agent.invoke({
“messages”: [
HumanMessage(content=“What were the operating expenses in Nike’s latest quarterly earnings report?”)
]
})
print(response[“messages”][-1].content)
版本注意事項:本程式碼使用 LangGraph 與 LangChain 中最新的中介軟體功能。這些 API 可能會隨著框架的演進而改變-請檢查LangGraph 文件以取得最新的用法。
關鍵模組
1.生產級評估器設計
上面程式碼中的_evaluate_relevance() 方法是為了快速測試而刻意簡化的。對於生產級,您會想要具有置信度評分和可解釋性的結構化輸出:
from pydantic import BaseModel
from langchain.prompts import PromptTemplate
class RelevanceVerdict(BaseModel):
“""Structured output for the evaluation result""”
verdict: Literal[“relevant”, “ambiguous”, “incorrect”]
confidence: float # confidence score (used for memory quality monitoring)
reasoning: str # reason for the judgment (used for debugging and review)
# Note: the CRAG paper uses a fine-tuned T5-Large evaluator (10-20 ms latency)
# Here, gpt-4o-mini is used as the engineering implementation option (easier to deploy, but with slightly higher latency)
grader_llm = ChatOpenAI(model=“gpt-4o-mini”, temperature=0)
grader_prompt = PromptTemplate(
template="""You are an expert in document relevance evaluation. Assess whether the following documents can answer the query.
Query: {query}
Document content:
{document}
Evaluation criteria:
- relevant: the document directly contains the answer, confidence > 0.9
- ambiguous: the document is partially relevant, confidence 0.5-0.9
- incorrect: the document is irrelevant, confidence < 0.5
Return in JSON format: {{"verdict": "…", "confidence": 0.xx, "reasoning": "…"}}
“"”,
input_variables=[“query”, “document”]
)
grader_chain = grader_prompt | grader_llm.with_structured_output(RelevanceVerdict)
# Replace the _evaluate_relevance() method in CRAGMiddleware
def _evaluate_relevance(self, query: str, docs: list) -> Literal[“relevant”, “ambiguous”, “incorrect”]:
"""Evaluate document relevance (returns structured result)“"”
if not docs:
return “incorrect”
<span class="hljs-comment"># Evaluate only the Top-3 documents, taking the first 500 characters of each</span>
doc_content = <span class="hljs-string">"\n\n"</span>.join([
<span class="hljs-string">f"[Document <span class="hljs-subst">{i+<span class="hljs-number">1</span>}</span>] <span class="hljs-subst">{doc.page_content[:<span class="hljs-number">500</span>]}</span>..."</span>
<span class="hljs-keyword">for</span> i, doc <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(docs[:<span class="hljs-number">3</span>])
])
result = grader_chain.invoke({
<span class="hljs-string">"query"</span>: query,
<span class="hljs-string">"document"</span>: doc_content
})
<span class="hljs-comment"># Store the confidence score in logs or a monitoring system</span>
<span class="hljs-built_in">print</span>(<span class="hljs-string">f"[CRAG Evaluation] verdict=<span class="hljs-subst">{result.verdict}</span>, confidence=<span class="hljs-subst">{result.confidence:<span class="hljs-number">.2</span>f}</span>"</span>)
<span class="hljs-built_in">print</span>(<span class="hljs-string">f"[CRAG Reasoning] <span class="hljs-subst">{result.reasoning}</span>"</span>)
<span class="hljs-comment"># Optional: store the evaluation result in Milvus for memory quality analysis</span>
<span class="hljs-variable language_">self</span>._store_evaluation_metrics(query, result)
<span class="hljs-keyword">return</span> result.verdict
def _store_evaluation_metrics(self, query: str, verdict_result: RelevanceVerdict):
"""Store evaluation metrics in Milvus (for memory quality monitoring)“"”
# Example: store the evaluation result in a separate Collection for analysis
# In actual use, you need to create the evaluation_metrics Collection
pass
2.知識精煉和回退
三種機制共同作用以保持模型上下文的高品質:
- 知識精煉會擷取與查詢最相關的句子,並去除雜訊。
- 當本地檢索不足時,會啟動後備搜尋,透過 Tavily 擷取外部知識。
- 上下文合併將內部記憶體與外部結果合併為單一的、重複的上下文區塊,然後再傳送至模型。
在生產中運行 CRAG 的提示
一旦超越原型設計,有三個方面最重要。
1.成本:選擇正確的評估器
評估器會在每個查詢上執行,因此是延遲和成本的最大槓桿。
- 高併發工作負載:像 T5-Large 這種經過微調的輕量級模型,可以將延遲維持在 10 到 20 毫秒,而且成本可以預測。
- 低流量或原型:
gpt-4o-mini之類的託管機型建置速度較快,需要的作業也較少,但延遲和每次呼叫成本較高。
2.可觀察性:從第一天開始儀器
最難處理的生產問題是您在答覆品質下降前無法察覺的問題。
- 基礎結構監控:Milvus 與Prometheus 整合。從三個指標開始:
milvus_query_latency_seconds,milvus_search_qps, 和milvus_insert_throughput。 - 應用程式監控:追蹤 CRAG 判決分佈、網路搜尋觸發率和信心分數分佈。如果沒有這些訊號,您就無法判斷品質下降是由於不良的檢索或評估人員的錯誤判斷所造成。
3.長期維護:防止記憶體污染
代理程式執行的時間越長,記憶體中累積的陳舊和低品質資料就越多。及早設定防護措施:
- 預先過濾:只浮現
confidence > 0.7的記憶體,讓低品質的內容在到達評估器前就被攔截。 - 時間衰減:逐漸降低較舊記憶體的權重。30 天是一個合理的預設起始時間,可依使用情況調整。
- 排程清理:每週執行一次工作,清除舊的、低可信度、未經驗證的記憶。這可防止陳舊資料被擷取、使用、再儲存的回饋循環。
總結 - 以及一些常見問題
CRAG 可解決生產 RAG 最常見的問題之一:檢索結果看起來相關,但實際上卻不相關。透過在擷取與產生之間插入評估與修正步驟,它可以篩選出不良的結果、填補外部搜尋的缺口,並提供模型更乾淨的上下文。
要讓 CRAG 在生產中可靠地運作,需要的不只是良好的擷取邏輯。它需要一個向量資料庫,能夠處理多租戶隔離、混合搜尋,以及不斷演進的模式 - 這正是Milvus的用武之地。在應用程式方面,選擇正確的評估器、及早檢測可觀察性,以及主動管理記憶體品質,才能將演示與值得信賴的系統區別開來。
如果您正在建立 RAG 或代理系統,並遇到檢索品質問題,我們很樂意提供協助:
- 加入Milvus Slack 社群,提出問題、分享您的架構,並向其他正在處理類似問題的開發人員取經。
- 預約 20 分鐘的 Milvus Office Hours 免費會議,與團隊一起討論您的使用個案 - 無論是 CRAG 設計、混合檢索或多租戶擴充。
- 如果您想跳過基礎架構的設定,直接開始建置,Zilliz Cloud(Milvus管理) 提供免費的層級讓您開始使用。
當團隊開始實施 CRAG 時,經常會遇到的幾個問題:
CRAG 與只在 RAG 中加入 reranker 有何不同?
重排器會根據相關性對結果重新排序,但仍假設擷取的文件是可用的。CRAG 則更進一步--它會評估擷取的內容是否真的完全符合查詢的需求,並在不符合時採取矯正措施:精煉部分符合的內容、以網路搜尋作為補充,或是完全捨棄結果。這是一個品質控制循環,而不只是一個更好的排序。
為什麼高相似度得分有時候會傳回錯誤的文件?
嵌入相似性測量向量空間中的語意接近度,但這與回答問題不同。關於在 Apache 上設定 HTTPS 的文件,在語義上與關於在 Nginx 上設定 HTTPS 的問題很接近,但卻沒有幫助。CRAG 藉由評估與實際查詢的相關性,而不只是向量距離,來捕捉這一點。
CRAG 的向量資料庫應該注意什麼?
最重要的有三件事:混合檢索 (因此您可以結合語意搜尋與關鍵字比對,以取得精確的詞彙)、多租戶隔離 (因此每個使用者或代理程式會話都有自己的記憶體空間),以及彈性的模式 (因此您可以新增confidence 或verified 等欄位,而不會因為管道演進而停機)。
如果擷取的文件都不相關,該怎麼辦?
CRAG 不會就此放棄。當置信度低於 0.5 時,它會回到網頁搜尋。當結果模棱兩可 (0.5-0.9) 時,它會將精煉的內部文件與外部搜尋結果合併。即使在知識庫有缺口的情況下,模型也總是能得到一些背景資料。
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word



