الاسترجاع السياقي مع ميلفوس
الاسترجاع السياقي للصور هو أسلوب استرجاع متقدم اقترحته أنثروبيك لمعالجة مشكلة العزل الدلالي للقطع، والتي تنشأ في حلول الاسترجاع المعزز الحالية (RAG). في نموذج RAG العملي الحالي، يتم تقسيم المستندات في نموذج RAG العملي الحالي إلى عدة أجزاء، ويتم استخدام قاعدة بيانات متجهة للبحث عن الاستعلام، واسترجاع الأجزاء الأكثر صلة. ثم يستجيب LLM للاستعلام باستخدام هذه الأجزاء المسترجعة. ومع ذلك، يمكن أن تؤدي عملية التقطيع هذه إلى فقدان المعلومات السياقية، مما يجعل من الصعب على المسترجع تحديد مدى الصلة.
يعمل الاسترجاع السياقي على تحسين أنظمة الاسترجاع التقليدية من خلال إضافة السياق ذي الصلة إلى كل جزء من أجزاء المستند قبل التضمين أو الفهرسة، مما يعزز الدقة ويقلل من أخطاء الاسترجاع. وبالاقتران مع تقنيات مثل الاسترجاع الهجين وإعادة الترتيب الهجين، فإنه يعزز أنظمة الاسترجاع المعزز (RAG)، خاصةً بالنسبة لقواعد المعرفة الكبيرة. بالإضافة إلى ذلك، فإنه يوفر حلاً فعالاً من حيث التكلفة عند إقرانه بالتخزين المؤقت السريع، مما يقلل بشكل كبير من زمن الاستجابة والتكاليف التشغيلية، حيث تبلغ تكلفة القطع السياقية حوالي 1.02 دولار لكل مليون رمز مستند. وهذا يجعله نهجاً قابلاً للتطوير وفعالاً للتعامل مع قواعد المعرفة الكبيرة. يُظهر حل أنثروبيك جانبين ثاقبين:
Document Enhancement
: إعادة كتابة الاستعلام هي تقنية حاسمة في استرجاع المعلومات الحديثة، وغالبًا ما تستخدم المعلومات المساعدة لجعل الاستعلام أكثر إفادة. وبالمثل، ولتحقيق أداء أفضل في استرجاع المعلومات من خلال الفهرسة RAG، فإن المعالجة المسبقة للمستندات باستخدام الفهرسة (على سبيل المثال، تنظيف مصدر البيانات، واستكمال المعلومات المفقودة، والتلخيص، وما إلى ذلك) قبل الفهرسة يمكن أن يحسن بشكل كبير من فرص استرجاع المستندات ذات الصلة. وبعبارة أخرى، تساعد خطوة المعالجة المسبقة هذه في تقريب المستندات من الاستعلامات من حيث الصلة بالموضوع.Low-Cost Processing by Caching Long Context
: أحد الشواغل الشائعة عند استخدام الآلات ذات المسؤولية المحدودة لمعالجة المستندات هو التكلفة. تعتبر ذاكرة التخزين المؤقت KVC حلاً شائعًا يسمح بإعادة استخدام النتائج الوسيطة لنفس السياق السابق. في حين أن معظم بائعي LLM المستضافين يجعلون هذه الميزة شفافة للمستخدم، فإن أنثروبيك يمنح المستخدمين التحكم في عملية التخزين المؤقت. عند حدوث ضرب في ذاكرة التخزين المؤقت، يمكن حفظ معظم العمليات الحسابية (وهذا أمر شائع عندما يبقى السياق الطويل كما هو، ولكن التعليمات لكل استعلام تتغير). لمزيد من التفاصيل، انقر هنا.
سنوضح في هذا الدفتر كيفية إجراء الاسترجاع السياقي باستخدام Milvus مع LLM، والجمع بين الاسترجاع الهجين الكثيف والمتناثر وإعادة الترتيب لإنشاء نظام استرجاع أكثر قوة بشكل تدريجي. تعتمد البيانات والإعداد التجريبي على الاسترجاع السياقي.
الإعداد
تثبيت التبعيات
$ pip install "pymilvus[model]"
$ pip install tqdm
$ pip install anthropic
إذا كنت تستخدم Google Colab، لتمكين التبعيات المثبتة للتو، قد تحتاج إلى إعادة تشغيل وقت التشغيل (انقر على قائمة "وقت التشغيل" في أعلى الشاشة، وحدد "إعادة تشغيل الجلسة" من القائمة المنسدلة).
ستحتاج إلى مفاتيح API من Cohere و Voyage و Anthropic لتشغيل الكود.
تنزيل البيانات
سيقوم الأمر التالي بتنزيل مثال البيانات المستخدمة في العرض التوضيحي الأصلي لأنثروبيك.
$ wget https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/codebase_chunks.json
$ wget https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/evaluation_set.jsonl
تعريف المسترد
صُممت هذه الفئة لتكون مرنة، مما يسمح لك بالاختيار بين أوضاع الاسترجاع المختلفة بناءً على احتياجاتك. من خلال تحديد الخيارات في طريقة التهيئة، يمكنك تحديد ما إذا كنت تريد استخدام الاسترجاع السياقي، أو البحث الهجين (الجمع بين طرق الاسترجاع الكثيفة والمتناثرة)، أو إعادة ترتيب النتائج المحسنة.
from pymilvus.model.dense import VoyageEmbeddingFunction
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
from pymilvus.model.reranker import CohereRerankFunction
from typing import List, Dict, Any
from typing import Callable
from pymilvus import (
MilvusClient,
DataType,
AnnSearchRequest,
RRFRanker,
)
from tqdm import tqdm
import json
import anthropic
class MilvusContextualRetriever:
def __init__(
self,
uri="milvus.db",
collection_name="contexual_bgem3",
dense_embedding_function=None,
use_sparse=False,
sparse_embedding_function=None,
use_contextualize_embedding=False,
anthropic_client=None,
use_reranker=False,
rerank_function=None,
):
self.collection_name = collection_name
# For Milvus-lite, uri is a local path like "./milvus.db"
# For Milvus standalone service, uri is like "http://localhost:19530"
# For Zilliz Clond, please set `uri` and `token`, which correspond to the [Public Endpoint and API key](https://docs.zilliz.com/docs/on-zilliz-cloud-console#cluster-details) in Zilliz Cloud.
self.client = MilvusClient(uri)
self.embedding_function = dense_embedding_function
self.use_sparse = use_sparse
self.sparse_embedding_function = None
self.use_contextualize_embedding = use_contextualize_embedding
self.anthropic_client = anthropic_client
self.use_reranker = use_reranker
self.rerank_function = rerank_function
if use_sparse is True and sparse_embedding_function:
self.sparse_embedding_function = sparse_embedding_function
elif sparse_embedding_function is False:
raise ValueError(
"Sparse embedding function cannot be None if use_sparse is False"
)
else:
pass
def build_collection(self):
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_field=True,
)
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="dense_vector",
datatype=DataType.FLOAT_VECTOR,
dim=self.embedding_function.dim,
)
if self.use_sparse is True:
schema.add_field(
field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR
)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="dense_vector", index_type="FLAT", metric_type="IP"
)
if self.use_sparse is True:
index_params.add_index(
field_name="sparse_vector",
index_type="SPARSE_INVERTED_INDEX",
metric_type="IP",
)
self.client.create_collection(
collection_name=self.collection_name,
schema=schema,
index_params=index_params,
enable_dynamic_field=True,
)
def insert_data(self, chunk, metadata):
dense_vec = self.embedding_function([chunk])[0]
if self.use_sparse is True:
sparse_result = self.sparse_embedding_function.encode_documents([chunk])
if type(sparse_result) == dict:
sparse_vec = sparse_result["sparse"][[0]]
else:
sparse_vec = sparse_result[[0]]
self.client.insert(
collection_name=self.collection_name,
data={
"dense_vector": dense_vec,
"sparse_vector": sparse_vec,
**metadata,
},
)
else:
self.client.insert(
collection_name=self.collection_name,
data={"dense_vector": dense_vec, **metadata},
)
def insert_contextualized_data(self, doc, chunk, metadata):
contextualized_text, usage = self.situate_context(doc, chunk)
metadata["context"] = contextualized_text
text_to_embed = f"{chunk}\n\n{contextualized_text}"
dense_vec = self.embedding_function([text_to_embed])[0]
if self.use_sparse is True:
sparse_vec = self.sparse_embedding_function.encode_documents(
[text_to_embed]
)["sparse"][[0]]
self.client.insert(
collection_name=self.collection_name,
data={
"dense_vector": dense_vec,
"sparse_vector": sparse_vec,
**metadata,
},
)
else:
self.client.insert(
collection_name=self.collection_name,
data={"dense_vector": dense_vec, **metadata},
)
def situate_context(self, doc: str, chunk: str):
DOCUMENT_CONTEXT_PROMPT = """
<document>
{doc_content}
</document>
"""
CHUNK_CONTEXT_PROMPT = """
Here is the chunk we want to situate within the whole document
<chunk>
{chunk_content}
</chunk>
Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.
Answer only with the succinct context and nothing else.
"""
response = self.anthropic_client.beta.prompt_caching.messages.create(
model="claude-3-haiku-20240307",
max_tokens=1000,
temperature=0.0,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": DOCUMENT_CONTEXT_PROMPT.format(doc_content=doc),
"cache_control": {
"type": "ephemeral"
}, # we will make use of prompt caching for the full documents
},
{
"type": "text",
"text": CHUNK_CONTEXT_PROMPT.format(chunk_content=chunk),
},
],
},
],
extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"},
)
return response.content[0].text, response.usage
def search(self, query: str, k: int = 20) -> List[Dict[str, Any]]:
dense_vec = self.embedding_function([query])[0]
if self.use_sparse is True:
sparse_vec = self.sparse_embedding_function.encode_queries([query])[
"sparse"
][[0]]
req_list = []
if self.use_reranker:
k = k * 10
if self.use_sparse is True:
req_list = []
dense_search_param = {
"data": [dense_vec],
"anns_field": "dense_vector",
"param": {"metric_type": "IP"},
"limit": k * 2,
}
dense_req = AnnSearchRequest(**dense_search_param)
req_list.append(dense_req)
sparse_search_param = {
"data": [sparse_vec],
"anns_field": "sparse_vector",
"param": {"metric_type": "IP"},
"limit": k * 2,
}
sparse_req = AnnSearchRequest(**sparse_search_param)
req_list.append(sparse_req)
docs = self.client.hybrid_search(
self.collection_name,
req_list,
RRFRanker(),
k,
output_fields=[
"content",
"original_uuid",
"doc_id",
"chunk_id",
"original_index",
"context",
],
)
else:
docs = self.client.search(
self.collection_name,
data=[dense_vec],
anns_field="dense_vector",
limit=k,
output_fields=[
"content",
"original_uuid",
"doc_id",
"chunk_id",
"original_index",
"context",
],
)
if self.use_reranker and self.use_contextualize_embedding:
reranked_texts = []
reranked_docs = []
for i in range(k):
if self.use_contextualize_embedding:
reranked_texts.append(
f"{docs[0][i]['entity']['content']}\n\n{docs[0][i]['entity']['context']}"
)
else:
reranked_texts.append(f"{docs[0][i]['entity']['content']}")
results = self.rerank_function(query, reranked_texts)
for result in results:
reranked_docs.append(docs[0][result.index])
docs[0] = reranked_docs
return docs
def evaluate_retrieval(
queries: List[Dict[str, Any]], retrieval_function: Callable, db, k: int = 20
) -> Dict[str, float]:
total_score = 0
total_queries = len(queries)
for query_item in tqdm(queries, desc="Evaluating retrieval"):
query = query_item["query"]
golden_chunk_uuids = query_item["golden_chunk_uuids"]
# Find all golden chunk contents
golden_contents = []
for doc_uuid, chunk_index in golden_chunk_uuids:
golden_doc = next(
(
doc
for doc in query_item["golden_documents"]
if doc["uuid"] == doc_uuid
),
None,
)
if not golden_doc:
print(f"Warning: Golden document not found for UUID {doc_uuid}")
continue
golden_chunk = next(
(
chunk
for chunk in golden_doc["chunks"]
if chunk["index"] == chunk_index
),
None,
)
if not golden_chunk:
print(
f"Warning: Golden chunk not found for index {chunk_index} in document {doc_uuid}"
)
continue
golden_contents.append(golden_chunk["content"].strip())
if not golden_contents:
print(f"Warning: No golden contents found for query: {query}")
continue
retrieved_docs = retrieval_function(query, db, k=k)
# Count how many golden chunks are in the top k retrieved documents
chunks_found = 0
for golden_content in golden_contents:
for doc in retrieved_docs[0][:k]:
retrieved_content = doc["entity"]["content"].strip()
if retrieved_content == golden_content:
chunks_found += 1
break
query_score = chunks_found / len(golden_contents)
total_score += query_score
average_score = total_score / total_queries
pass_at_n = average_score * 100
return {
"pass_at_n": pass_at_n,
"average_score": average_score,
"total_queries": total_queries,
}
def retrieve_base(query: str, db, k: int = 20) -> List[Dict[str, Any]]:
return db.search(query, k=k)
def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
"""Load JSONL file and return a list of dictionaries."""
with open(file_path, "r") as file:
return [json.loads(line) for line in file]
def evaluate_db(db, original_jsonl_path: str, k):
# Load the original JSONL data for queries and ground truth
original_data = load_jsonl(original_jsonl_path)
# Evaluate retrieval
results = evaluate_retrieval(original_data, retrieve_base, db, k)
print(f"Pass@{k}: {results['pass_at_n']:.2f}%")
print(f"Total Score: {results['average_score']}")
print(f"Total queries: {results['total_queries']}")
تحتاج الآن إلى تهيئة هذه النماذج للتجارب التالية. يمكنك التبديل بسهولة إلى نماذج أخرى باستخدام مكتبة نماذج PyMilvus.
dense_ef = VoyageEmbeddingFunction(api_key="your-voyage-api-key", model_name="voyage-2")
sparse_ef = BGEM3EmbeddingFunction()
cohere_rf = CohereRerankFunction(api_key="your-cohere-api-key")
Fetching 30 files: 0%| | 0/30 [00:00<?, ?it/s]
path = "codebase_chunks.json"
with open(path, "r") as f:
dataset = json.load(f)
التجربة الأولى: الاسترجاع القياسي
يستخدم الاسترجاع القياسي التضمينات الكثيفة فقط لاسترجاع المستندات ذات الصلة. في هذه التجربة، سوف نستخدم Pass@5 لإعادة إنتاج النتائج من الريبو الأصلي.
standard_retriever = MilvusContextualRetriever(
uri="standard.db", collection_name="standard", dense_embedding_function=dense_ef
)
standard_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
standard_retriever.insert_data(chunk_content, metadata)
evaluate_db(standard_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [01:29<00:00, 2.77it/s]
Pass@5: 80.92%
Total Score: 0.8091877880184332
Total queries: 248
التجربة الثانية: الاسترجاع الهجين
والآن بعد أن حصلنا على نتائج واعدة باستخدام تضمين Voyage، سننتقل إلى إجراء الاسترجاع الهجين باستخدام نموذج BGE-M3 الذي يولد تضمينات متناثرة قوية. سيتم دمج النتائج من الاسترجاع الكثيف والاسترجاع المتناثر باستخدام طريقة دمج الرتب المتبادلة (RRF) للحصول على نتيجة هجينة.
hybrid_retriever = MilvusContextualRetriever(
uri="hybrid.db",
collection_name="hybrid",
dense_embedding_function=dense_ef,
use_sparse=True,
sparse_embedding_function=sparse_ef,
)
hybrid_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
hybrid_retriever.insert_data(chunk_content, metadata)
evaluate_db(hybrid_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [02:09<00:00, 1.92it/s]
Pass@5: 84.69%
Total Score: 0.8469182027649771
Total queries: 248
التجربة الثالثة: الاسترجاع السياقي
يُظهر الاسترجاع الهجين تحسناً، ولكن يمكن تحسين النتائج بشكل أكبر من خلال تطبيق طريقة الاسترجاع السياقي. ولتحقيق ذلك، سنستخدم نموذج لغة أنثروبيك لإضافة السياق من المستند بأكمله لكل جزء.
anthropic_client = anthropic.Anthropic(
api_key="your-anthropic-api-key",
)
contextual_retriever = MilvusContextualRetriever(
uri="contextual.db",
collection_name="contextual",
dense_embedding_function=dense_ef,
use_sparse=True,
sparse_embedding_function=sparse_ef,
use_contextualize_embedding=True,
anthropic_client=anthropic_client,
)
contextual_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
contextual_retriever.insert_contextualized_data(
doc_content, chunk_content, metadata
)
evaluate_db(contextual_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [01:55<00:00, 2.15it/s]
Pass@5: 87.14%
Total Score: 0.8713517665130568
Total queries: 248
التجربة الرابعة: الاسترجاع السياقي باستخدام أداة إعادة التصنيف
يمكن تحسين النتائج بشكل أكبر من خلال إضافة معيد تصنيف Cohere. بدون تهيئة مسترجِع جديد مع أداة إعادة الترتيب بشكل منفصل، يمكننا ببساطة تهيئة المسترجِع الحالي لاستخدام أداة إعادة الترتيب لتحسين الأداء.
contextual_retriever.use_reranker = True
contextual_retriever.rerank_function = cohere_rf
evaluate_db(contextual_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [02:02<00:00, 2.00it/s]
Pass@5: 90.91%
Total Score: 0.9090821812596005
Total queries: 248
لقد أظهرنا عدة طرق لتحسين أداء الاسترجاع. مع المزيد من التصميم المخصص المصمم خصيصًا للسيناريو، يُظهر الاسترجاع السياقي إمكانات كبيرة لمعالجة المستندات مسبقًا بتكلفة منخفضة، مما يؤدي إلى نظام استرجاع أفضل.