استخدام كولبالي للاسترجاع متعدد الوسائط مع ميلفوس
تستخدم نماذج الاسترجاع الحديثة عادةً تضمينًا واحدًا لتمثيل النص أو الصور. ومع ذلك، فإن ColBERT هو نموذج عصبي يستخدم قائمة من التضمينات لكل مثيل بيانات ويستخدم عملية "MaxSim" لحساب التشابه بين نصين. بالإضافة إلى البيانات النصية، تحتوي الأشكال والجداول والرسوم البيانية أيضًا على معلومات غنية، والتي غالبًا ما يتم تجاهلها في استرجاع المعلومات المستندة إلى النصوص.
تقارن دالة MaxSim استعلامًا مع مستند (ما تبحث فيه) من خلال النظر في تضمينات الرموز المميزة الخاصة بهما. لكل كلمة في الاستعلام، تقوم باختيار الكلمة الأكثر تشابهًا من المستند (باستخدام تشابه جيب التمام أو مسافة L2 المربعة) وتجمع أوجه التشابه القصوى هذه عبر جميع الكلمات في الاستعلام
ColPali هي طريقة تجمع بين تمثيل ColBERT متعدد المتجهات مع PaliGemma (نموذج لغوي كبير متعدد الوسائط) للاستفادة من قدرات الفهم القوية. يتيح هذا النهج تمثيل صفحة تحتوي على كل من النصوص والصور باستخدام تضمين موحد متعدد المتجهات. يمكن للتضمينات داخل هذا التمثيل متعدد النواقل أن تلتقط معلومات مفصلة، مما يحسّن أداء التوليد المعزز للاسترجاع (RAG) للبيانات متعددة الوسائط.
في هذا الدفتر، نشير في هذا الدفتر إلى هذا النوع من التمثيل متعدد النواقل باسم "تضمينات كولبيرت" من أجل العمومية. ومع ذلك، فإن النموذج الفعلي المستخدم هو نموذج ColPali. سنوضح كيفية استخدام ميلفوس لاسترجاع متعدد المتجهات. بناءً على ذلك، سنقدم كيفية استخدام ColPali لاسترجاع الصفحات بناءً على استعلام معين.
التحضير
$ pip install pdf2image
$ pip pymilvus
$ pip install colpali_engine
$ pip install tqdm
$ pip instal pillow
إعداد البيانات
سنستخدم ملف PDF RAG كمثال لنا. يمكنك تنزيل ورقة ColBERT ووضعها في ./pdf
. لا يقوم ColPali بمعالجة النص مباشرةً؛ بدلاً من ذلك، يتم تنقيط الصفحة بأكملها في صورة. يتفوق نموذج ColPali في فهم المعلومات النصية الموجودة في هذه الصور. لذلك، سنقوم بتحويل كل صفحة PDF إلى صورة للمعالجة.
from pdf2image import convert_from_path
pdf_path = "pdfs/2004.12832v2.pdf"
images = convert_from_path(pdf_path)
for i, image in enumerate(images):
image.save(f"pages/page_{i + 1}.png", "PNG")
بعد ذلك، سنقوم بتهيئة قاعدة بيانات باستخدام Milvus Lite. يمكنك التبديل بسهولة إلى مثيل Milvus الكامل عن طريق تعيين uri إلى العنوان المناسب حيث تتم استضافة خدمة Milvus الخاصة بك.
from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures
client = MilvusClient(uri="milvus.db")
- إذا كنت تحتاج فقط إلى قاعدة بيانات متجهة محلية للبيانات الصغيرة الحجم أو النماذج الأولية، فإن تعيين uri كملف محلي، على سبيل المثال
./milvus.db
، هو الطريقة الأكثر ملاءمة، حيث يستخدم تلقائيًا Milvus Lite لتخزين جميع البيانات في هذا الملف. - إذا كان لديك حجم كبير من البيانات، على سبيل المثال أكثر من مليون ناقل، يمكنك إعداد خادم Milvus أكثر أداءً على Docker أو Kubernetes. في هذا الإعداد، يُرجى استخدام عنوان الخادم والمنفذ كـ uri، على سبيل المثال
http://localhost:19530
. إذا قمت بتمكين خاصية المصادقة على Milvus، استخدم "<your_username>: <your_password>" كرمز مميز، وإلا فلا تقم بتعيين الرمز المميز. - إذا كنت تستخدم Zilliz Cloud، الخدمة السحابية المُدارة بالكامل لـ Milvus، اضبط
uri
وtoken
، والتي تتوافق مع نقطة النهاية العامة ومفتاح واجهة برمجة التطبيقات في Zilliz Cloud.
سنقوم بتعريف فئة MilvusColbertRetriever للالتفاف حول عميل Milvus لاسترجاع البيانات متعددة النواقل. يعمل التطبيق على تسطيح تضمينات ColBERT وإدراجها في مجموعة، حيث يمثل كل صف تضمينًا فرديًا من قائمة تضمين ColBERT. كما يسجل أيضًا doc_id و seq_id لتتبع أصل كل تضمين.
عند البحث باستخدام قائمة تضمين ColBERT، سيتم إجراء عمليات بحث متعددة - واحدة لكل تضمين ColBERT. سيتم بعد ذلك إلغاء تكرار المستندات_المعرّفات المسترجعة. سيتم إجراء عملية إعادة ترتيب، حيث يتم جلب التضمينات الكاملة لكل doc_id، ويتم حساب درجة MaxSim للحصول على النتائج النهائية المصنفة.
class MilvusColbertRetriever:
def __init__(self, milvus_client, collection_name, dim=128):
# Initialize the retriever with a Milvus client, collection name, and dimensionality of the vector embeddings.
# If the collection exists, load it.
self.collection_name = collection_name
self.client = milvus_client
if self.client.has_collection(collection_name=self.collection_name):
self.client.load_collection(collection_name)
self.dim = dim
def create_collection(self):
# Create a new collection in Milvus for storing embeddings.
# Drop the existing collection if it already exists and define the schema for the collection.
if self.client.has_collection(collection_name=self.collection_name):
self.client.drop_collection(collection_name=self.collection_name)
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_fields=True,
)
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
)
schema.add_field(field_name="seq_id", datatype=DataType.INT16)
schema.add_field(field_name="doc_id", datatype=DataType.INT64)
schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)
self.client.create_collection(
collection_name=self.collection_name, schema=schema
)
def create_index(self):
# Create an index on the vector field to enable fast similarity search.
# Releases and drops any existing index before creating a new one with specified parameters.
self.client.release_collection(collection_name=self.collection_name)
self.client.drop_index(
collection_name=self.collection_name, index_name="vector"
)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_name="vector_index",
index_type="HNSW", # or any other index type you want
metric_type="IP", # or the appropriate metric type
params={
"M": 16,
"efConstruction": 500,
}, # adjust these parameters as needed
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def create_scalar_index(self):
# Create a scalar index for the "doc_id" field to enable fast lookups by document ID.
self.client.release_collection(collection_name=self.collection_name)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="doc_id",
index_name="int32_index",
index_type="INVERTED", # or any other index type you want
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def search(self, data, topk):
# Perform a vector search on the collection to find the top-k most similar documents.
search_params = {"metric_type": "IP", "params": {}}
results = self.client.search(
self.collection_name,
data,
limit=int(50),
output_fields=["vector", "seq_id", "doc_id"],
search_params=search_params,
)
doc_ids = set()
for r_id in range(len(results)):
for r in range(len(results[r_id])):
doc_ids.add(results[r_id][r]["entity"]["doc_id"])
scores = []
def rerank_single_doc(doc_id, data, client, collection_name):
# Rerank a single document by retrieving its embeddings and calculating the similarity with the query.
doc_colbert_vecs = client.query(
collection_name=collection_name,
filter=f"doc_id in [{doc_id}]",
output_fields=["seq_id", "vector", "doc"],
limit=1000,
)
doc_vecs = np.vstack(
[doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
)
score = np.dot(data, doc_vecs.T).max(1).sum()
return (score, doc_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
futures = {
executor.submit(
rerank_single_doc, doc_id, data, client, self.collection_name
): doc_id
for doc_id in doc_ids
}
for future in concurrent.futures.as_completed(futures):
score, doc_id = future.result()
scores.append((score, doc_id))
scores.sort(key=lambda x: x[0], reverse=True)
if len(scores) >= topk:
return scores[:topk]
else:
return scores
def insert(self, data):
# Insert ColBERT embeddings and metadata for a document into the collection.
colbert_vecs = [vec for vec in data["colbert_vecs"]]
seq_length = len(colbert_vecs)
doc_ids = [data["doc_id"] for i in range(seq_length)]
seq_ids = list(range(seq_length))
docs = [""] * seq_length
docs[0] = data["filepath"]
# Insert the data as multiple vectors (one for each sequence) along with the corresponding metadata.
self.client.insert(
self.collection_name,
[
{
"vector": colbert_vecs[i],
"seq_id": seq_ids[i],
"doc_id": doc_ids[i],
"doc": docs[i],
}
for i in range(seq_length)
],
)
سوف نستخدم محرك colpali_engine لاستخراج قوائم التضمين لاستعلامين واسترداد المعلومات ذات الصلة من صفحات PDF.
from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.processing_utils import BaseVisualRetrieverProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from typing import List, cast
device = get_torch_device("cpu")
model_name = "vidore/colpali-v1.2"
model = ColPali.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=device,
).eval()
queries = [
"How to end-to-end retrieval with ColBert?",
"Where is ColBERT performance table?",
]
processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))
dataloader = DataLoader(
dataset=ListDataset[str](queries),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_queries(x),
)
qs: List[torch.Tensor] = []
for batch_query in dataloader:
with torch.no_grad():
batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
embeddings_query = model(**batch_query)
qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
بالإضافة إلى ذلك، سنحتاج إلى استخراج قائمة التضمين لكل صفحة، وسيظهر لنا أن هناك 1030 تضمينًا من 128 بُعدًا لكل صفحة.
from tqdm import tqdm
from PIL import Image
import os
images = [Image.open("./pages/" + name) for name in os.listdir("./pages")]
dataloader = DataLoader(
dataset=ListDataset[str](images),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_images(x),
)
ds: List[torch.Tensor] = []
for batch_doc in tqdm(dataloader):
with torch.no_grad():
batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
embeddings_doc = model(**batch_doc)
ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
print(ds[0].shape)
0%| | 0/10 [00:00<?, ?it/s]
100%|██████████| 10/10 [01:22<00:00, 8.24s/it]
torch.Size([1030, 128])
سننشئ مجموعة تسمى "colpali" باستخدام MilvusColbertRetriever.
retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()
سنقوم بإدراج قوائم التضمين في قاعدة بيانات ميلفوس.
filepaths = ["./pages/" + name for name in os.listdir("./pages")]
for i in range(len(filepaths)):
data = {
"colbert_vecs": ds[i].float().numpy(),
"doc_id": i,
"filepath": filepaths[i],
}
retriever.insert(data)
يمكننا الآن البحث عن الصفحة الأكثر صلة باستخدام قائمة تضمين الاستعلام.
for query in qs:
query = query.float().numpy()
result = retriever.search(query, topk=1)
print(filepaths[result[0][1]])
./pages/page_5.png
./pages/page_7.png
أخيرًا، نسترجع اسم الصفحة الأصلية. باستخدام ColPali، يمكننا استرداد المستندات متعددة الوسائط دون الحاجة إلى تقنيات معالجة معقدة لاستخراج النصوص والصور من المستندات. وبالاستفادة من نماذج الرؤية الكبيرة، يمكن تحليل المزيد من المعلومات - مثل الجداول والأشكال - دون فقدان كبير للمعلومات.