Pengambilan Kontekstual dengan Milvus
Pengambilan Kontekstual gambar adalah metode pengambilan lanjutan yang diusulkan oleh Anthropic untuk mengatasi masalah isolasi semantik dari potongan-potongan, yang muncul dalam solusi Retrieval-Augmented Generation (RAG) saat ini. Dalam paradigma RAG praktis saat ini, dokumen dibagi menjadi beberapa bagian, dan basis data vektor digunakan untuk mencari kueri, mengambil bagian yang paling relevan. LLM kemudian merespons kueri menggunakan potongan-potongan yang diambil ini. Namun, proses pemotongan ini dapat mengakibatkan hilangnya informasi kontekstual, sehingga menyulitkan retriever untuk menentukan relevansi.
Contextual Retrieval meningkatkan sistem pencarian tradisional dengan menambahkan konteks yang relevan pada setiap potongan dokumen sebelum disematkan atau diindeks, sehingga meningkatkan akurasi dan mengurangi kesalahan pencarian. Dikombinasikan dengan teknik seperti pengambilan hibrida dan pemeringkatan ulang, hal ini meningkatkan sistem Retrieval-Augmented Generation (RAG), terutama untuk basis pengetahuan yang besar. Selain itu, ia menawarkan solusi yang hemat biaya jika dipasangkan dengan cache yang cepat, yang secara signifikan mengurangi latensi dan biaya operasional, dengan potongan yang dikontekstualisasikan dengan biaya sekitar $ 1,02 per juta token dokumen. Hal ini menjadikannya pendekatan yang terukur dan efisien untuk menangani basis pengetahuan yang besar. Solusi Anthropic menunjukkan dua aspek yang mendalam:
Document Enhancement
: Penulisan ulang kueri adalah teknik penting dalam pencarian informasi modern, yang sering kali menggunakan informasi tambahan untuk membuat kueri menjadi lebih informatif. Demikian pula, untuk mencapai kinerja yang lebih baik dalam RAG, prapemrosesan dokumen dengan LLM (misalnya, membersihkan sumber data, melengkapi informasi yang hilang, meringkas, dll.) sebelum pengindeksan dapat secara signifikan meningkatkan peluang untuk mengambil dokumen yang relevan. Dengan kata lain, langkah prapemrosesan ini membantu mendekatkan dokumen ke kueri dalam hal relevansi.Low-Cost Processing by Caching Long Context
: Salah satu kekhawatiran umum ketika menggunakan LLM untuk memproses dokumen adalah biaya. KVCache adalah solusi populer yang memungkinkan penggunaan kembali hasil antara untuk konteks sebelumnya yang sama. Meskipun sebagian besar vendor LLM yang dihosting membuat fitur ini transparan bagi pengguna, Anthropic memberikan kontrol kepada pengguna atas proses caching. Ketika terjadi pemanggilan cache, sebagian besar komputasi dapat disimpan (hal ini biasa terjadi ketika konteks yang panjang tetap sama, tetapi instruksi untuk setiap kueri berubah). Untuk lebih jelasnya, klik di sini.
Dalam buku catatan ini, kami akan mendemonstrasikan bagaimana melakukan pengambilan kontekstual menggunakan Milvus dengan LLM, menggabungkan pengambilan hibrida padat-jarang dan perangking ulang untuk menciptakan sistem pengambilan yang semakin kuat. Data dan pengaturan eksperimental didasarkan pada pengambilan kontekstual.
Persiapan
Instal Ketergantungan
$ pip install "pymilvus[model]"
$ pip install tqdm
$ pip install anthropic
Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja diinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).
Anda akan membutuhkan kunci API dari Cohere, Voyage, dan Anthropic untuk menjalankan kode.
Mengunduh Data
Perintah berikut ini akan mengunduh contoh data yang digunakan dalam demo Anthropic asli.
$ wget https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/codebase_chunks.json
$ wget https://raw.githubusercontent.com/anthropics/anthropic-cookbook/refs/heads/main/skills/contextual-embeddings/data/evaluation_set.jsonl
Mendefinisikan Retriever
Kelas ini didesain agar fleksibel, sehingga Anda dapat memilih mode pengambilan data yang berbeda sesuai dengan kebutuhan Anda. Dengan menentukan opsi dalam metode inisialisasi, Anda dapat menentukan apakah akan menggunakan pencarian kontekstual, pencarian hibrida (menggabungkan metode pencarian padat dan jarang), atau perangking untuk hasil yang lebih baik.
from pymilvus.model.dense import VoyageEmbeddingFunction
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
from pymilvus.model.reranker import CohereRerankFunction
from typing import List, Dict, Any
from typing import Callable
from pymilvus import (
MilvusClient,
DataType,
AnnSearchRequest,
RRFRanker,
)
from tqdm import tqdm
import json
import anthropic
class MilvusContextualRetriever:
def __init__(
self,
uri="milvus.db",
collection_name="contexual_bgem3",
dense_embedding_function=None,
use_sparse=False,
sparse_embedding_function=None,
use_contextualize_embedding=False,
anthropic_client=None,
use_reranker=False,
rerank_function=None,
):
self.collection_name = collection_name
# For Milvus-lite, uri is a local path like "./milvus.db"
# For Milvus standalone service, uri is like "http://localhost:19530"
# For Zilliz Clond, please set `uri` and `token`, which correspond to the [Public Endpoint and API key](https://docs.zilliz.com/docs/on-zilliz-cloud-console#cluster-details) in Zilliz Cloud.
self.client = MilvusClient(uri)
self.embedding_function = dense_embedding_function
self.use_sparse = use_sparse
self.sparse_embedding_function = None
self.use_contextualize_embedding = use_contextualize_embedding
self.anthropic_client = anthropic_client
self.use_reranker = use_reranker
self.rerank_function = rerank_function
if use_sparse is True and sparse_embedding_function:
self.sparse_embedding_function = sparse_embedding_function
elif sparse_embedding_function is False:
raise ValueError(
"Sparse embedding function cannot be None if use_sparse is False"
)
else:
pass
def build_collection(self):
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_field=True,
)
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="dense_vector",
datatype=DataType.FLOAT_VECTOR,
dim=self.embedding_function.dim,
)
if self.use_sparse is True:
schema.add_field(
field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR
)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="dense_vector", index_type="FLAT", metric_type="IP"
)
if self.use_sparse is True:
index_params.add_index(
field_name="sparse_vector",
index_type="SPARSE_INVERTED_INDEX",
metric_type="IP",
)
self.client.create_collection(
collection_name=self.collection_name,
schema=schema,
index_params=index_params,
enable_dynamic_field=True,
)
def insert_data(self, chunk, metadata):
dense_vec = self.embedding_function([chunk])[0]
if self.use_sparse is True:
sparse_result = self.sparse_embedding_function.encode_documents([chunk])
if type(sparse_result) == dict:
sparse_vec = sparse_result["sparse"][[0]]
else:
sparse_vec = sparse_result[[0]]
self.client.insert(
collection_name=self.collection_name,
data={
"dense_vector": dense_vec,
"sparse_vector": sparse_vec,
**metadata,
},
)
else:
self.client.insert(
collection_name=self.collection_name,
data={"dense_vector": dense_vec, **metadata},
)
def insert_contextualized_data(self, doc, chunk, metadata):
contextualized_text, usage = self.situate_context(doc, chunk)
metadata["context"] = contextualized_text
text_to_embed = f"{chunk}\n\n{contextualized_text}"
dense_vec = self.embedding_function([text_to_embed])[0]
if self.use_sparse is True:
sparse_vec = self.sparse_embedding_function.encode_documents(
[text_to_embed]
)["sparse"][[0]]
self.client.insert(
collection_name=self.collection_name,
data={
"dense_vector": dense_vec,
"sparse_vector": sparse_vec,
**metadata,
},
)
else:
self.client.insert(
collection_name=self.collection_name,
data={"dense_vector": dense_vec, **metadata},
)
def situate_context(self, doc: str, chunk: str):
DOCUMENT_CONTEXT_PROMPT = """
<document>
{doc_content}
</document>
"""
CHUNK_CONTEXT_PROMPT = """
Here is the chunk we want to situate within the whole document
<chunk>
{chunk_content}
</chunk>
Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.
Answer only with the succinct context and nothing else.
"""
response = self.anthropic_client.beta.prompt_caching.messages.create(
model="claude-3-haiku-20240307",
max_tokens=1000,
temperature=0.0,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": DOCUMENT_CONTEXT_PROMPT.format(doc_content=doc),
"cache_control": {
"type": "ephemeral"
}, # we will make use of prompt caching for the full documents
},
{
"type": "text",
"text": CHUNK_CONTEXT_PROMPT.format(chunk_content=chunk),
},
],
},
],
extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"},
)
return response.content[0].text, response.usage
def search(self, query: str, k: int = 20) -> List[Dict[str, Any]]:
dense_vec = self.embedding_function([query])[0]
if self.use_sparse is True:
sparse_vec = self.sparse_embedding_function.encode_queries([query])[
"sparse"
][[0]]
req_list = []
if self.use_reranker:
k = k * 10
if self.use_sparse is True:
req_list = []
dense_search_param = {
"data": [dense_vec],
"anns_field": "dense_vector",
"param": {"metric_type": "IP"},
"limit": k * 2,
}
dense_req = AnnSearchRequest(**dense_search_param)
req_list.append(dense_req)
sparse_search_param = {
"data": [sparse_vec],
"anns_field": "sparse_vector",
"param": {"metric_type": "IP"},
"limit": k * 2,
}
sparse_req = AnnSearchRequest(**sparse_search_param)
req_list.append(sparse_req)
docs = self.client.hybrid_search(
self.collection_name,
req_list,
RRFRanker(),
k,
output_fields=[
"content",
"original_uuid",
"doc_id",
"chunk_id",
"original_index",
"context",
],
)
else:
docs = self.client.search(
self.collection_name,
data=[dense_vec],
anns_field="dense_vector",
limit=k,
output_fields=[
"content",
"original_uuid",
"doc_id",
"chunk_id",
"original_index",
"context",
],
)
if self.use_reranker and self.use_contextualize_embedding:
reranked_texts = []
reranked_docs = []
for i in range(k):
if self.use_contextualize_embedding:
reranked_texts.append(
f"{docs[0][i]['entity']['content']}\n\n{docs[0][i]['entity']['context']}"
)
else:
reranked_texts.append(f"{docs[0][i]['entity']['content']}")
results = self.rerank_function(query, reranked_texts)
for result in results:
reranked_docs.append(docs[0][result.index])
docs[0] = reranked_docs
return docs
def evaluate_retrieval(
queries: List[Dict[str, Any]], retrieval_function: Callable, db, k: int = 20
) -> Dict[str, float]:
total_score = 0
total_queries = len(queries)
for query_item in tqdm(queries, desc="Evaluating retrieval"):
query = query_item["query"]
golden_chunk_uuids = query_item["golden_chunk_uuids"]
# Find all golden chunk contents
golden_contents = []
for doc_uuid, chunk_index in golden_chunk_uuids:
golden_doc = next(
(
doc
for doc in query_item["golden_documents"]
if doc["uuid"] == doc_uuid
),
None,
)
if not golden_doc:
print(f"Warning: Golden document not found for UUID {doc_uuid}")
continue
golden_chunk = next(
(
chunk
for chunk in golden_doc["chunks"]
if chunk["index"] == chunk_index
),
None,
)
if not golden_chunk:
print(
f"Warning: Golden chunk not found for index {chunk_index} in document {doc_uuid}"
)
continue
golden_contents.append(golden_chunk["content"].strip())
if not golden_contents:
print(f"Warning: No golden contents found for query: {query}")
continue
retrieved_docs = retrieval_function(query, db, k=k)
# Count how many golden chunks are in the top k retrieved documents
chunks_found = 0
for golden_content in golden_contents:
for doc in retrieved_docs[0][:k]:
retrieved_content = doc["entity"]["content"].strip()
if retrieved_content == golden_content:
chunks_found += 1
break
query_score = chunks_found / len(golden_contents)
total_score += query_score
average_score = total_score / total_queries
pass_at_n = average_score * 100
return {
"pass_at_n": pass_at_n,
"average_score": average_score,
"total_queries": total_queries,
}
def retrieve_base(query: str, db, k: int = 20) -> List[Dict[str, Any]]:
return db.search(query, k=k)
def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
"""Load JSONL file and return a list of dictionaries."""
with open(file_path, "r") as file:
return [json.loads(line) for line in file]
def evaluate_db(db, original_jsonl_path: str, k):
# Load the original JSONL data for queries and ground truth
original_data = load_jsonl(original_jsonl_path)
# Evaluate retrieval
results = evaluate_retrieval(original_data, retrieve_base, db, k)
print(f"Pass@{k}: {results['pass_at_n']:.2f}%")
print(f"Total Score: {results['average_score']}")
print(f"Total queries: {results['total_queries']}")
Sekarang Anda perlu menginisialisasi model-model ini untuk eksperimen berikut. Anda dapat dengan mudah beralih ke model lain menggunakan pustaka model PyMilvus.
dense_ef = VoyageEmbeddingFunction(api_key="your-voyage-api-key", model_name="voyage-2")
sparse_ef = BGEM3EmbeddingFunction()
cohere_rf = CohereRerankFunction(api_key="your-cohere-api-key")
Fetching 30 files: 0%| | 0/30 [00:00<?, ?it/s]
path = "codebase_chunks.json"
with open(path, "r") as f:
dataset = json.load(f)
Eksperimen I: Pengambilan Standar
Pengambilan standar hanya menggunakan sematan padat untuk mengambil dokumen terkait. Dalam percobaan ini, kita akan menggunakan Pass@5 untuk mereproduksi hasil dari repo asli.
standard_retriever = MilvusContextualRetriever(
uri="standard.db", collection_name="standard", dense_embedding_function=dense_ef
)
standard_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
standard_retriever.insert_data(chunk_content, metadata)
evaluate_db(standard_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [01:29<00:00, 2.77it/s]
Pass@5: 80.92%
Total Score: 0.8091877880184332
Total queries: 248
Eksperimen II: Pengambilan Hibrida
Setelah kita mendapatkan hasil yang menjanjikan dengan penyematan Voyage, kita akan beralih ke pengambilan hibrida dengan menggunakan model BGE-M3 yang menghasilkan penyematan jarang yang kuat. Hasil dari dense retrieval dan sparse retrieval akan digabungkan menggunakan metode Reciprocal Rank Fusion (RRF) untuk menghasilkan hasil hybrid.
hybrid_retriever = MilvusContextualRetriever(
uri="hybrid.db",
collection_name="hybrid",
dense_embedding_function=dense_ef,
use_sparse=True,
sparse_embedding_function=sparse_ef,
)
hybrid_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
hybrid_retriever.insert_data(chunk_content, metadata)
evaluate_db(hybrid_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [02:09<00:00, 1.92it/s]
Pass@5: 84.69%
Total Score: 0.8469182027649771
Total queries: 248
Eksperimen III: Temu Kembali Kontekstual
Temu kembali hibrida menunjukkan peningkatan, tetapi hasilnya dapat lebih ditingkatkan dengan menerapkan metode temu kembali kontekstual. Untuk mencapai hal ini, kami akan menggunakan model bahasa Anthropic untuk mempersiapkan konteks dari keseluruhan dokumen untuk setiap potongan.
anthropic_client = anthropic.Anthropic(
api_key="your-anthropic-api-key",
)
contextual_retriever = MilvusContextualRetriever(
uri="contextual.db",
collection_name="contextual",
dense_embedding_function=dense_ef,
use_sparse=True,
sparse_embedding_function=sparse_ef,
use_contextualize_embedding=True,
anthropic_client=anthropic_client,
)
contextual_retriever.build_collection()
for doc in dataset:
doc_content = doc["content"]
for chunk in doc["chunks"]:
metadata = {
"doc_id": doc["doc_id"],
"original_uuid": doc["original_uuid"],
"chunk_id": chunk["chunk_id"],
"original_index": chunk["original_index"],
"content": chunk["content"],
}
chunk_content = chunk["content"]
contextual_retriever.insert_contextualized_data(
doc_content, chunk_content, metadata
)
evaluate_db(contextual_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [01:55<00:00, 2.15it/s]
Pass@5: 87.14%
Total Score: 0.8713517665130568
Total queries: 248
Eksperimen IV: Pengambilan Kontekstual dengan Reranker
Hasilnya dapat ditingkatkan lebih lanjut dengan menambahkan perangking ulang Cohere. Tanpa menginisialisasi retriever baru dengan reranker secara terpisah, kita cukup mengonfigurasi retriever yang sudah ada untuk menggunakan reranker untuk meningkatkan kinerja.
contextual_retriever.use_reranker = True
contextual_retriever.rerank_function = cohere_rf
evaluate_db(contextual_retriever, "evaluation_set.jsonl", 5)
Evaluating retrieval: 100%|██████████| 248/248 [02:02<00:00, 2.00it/s]
Pass@5: 90.91%
Total Score: 0.9090821812596005
Total queries: 248
Kami telah mendemonstrasikan beberapa metode untuk meningkatkan kinerja pengambilan. Dengan desain yang lebih ad-hoc yang disesuaikan dengan skenario, pengambilan kontekstual menunjukkan potensi yang signifikan untuk melakukan prapemrosesan dokumen dengan biaya rendah, sehingga menghasilkan sistem RAG yang lebih baik.