Gunakan ColPali untuk Pengambilan Multi-Modal dengan Milvus
Model pengambilan modern biasanya menggunakan satu penyematan untuk merepresentasikan teks atau gambar. Akan tetapi, ColBERT adalah model neural yang menggunakan daftar penyematan untuk setiap contoh data dan menggunakan operasi "MaxSim" untuk menghitung kemiripan antara dua teks. Selain data tekstual, gambar, tabel, dan diagram juga mengandung informasi yang kaya, yang sering diabaikan dalam pencarian informasi berbasis teks.
Fungsi MaxSim membandingkan kueri dengan dokumen (yang Anda cari) dengan melihat penyematan tokennya. Untuk setiap kata dalam kueri, ia memilih kata yang paling mirip dari dokumen (menggunakan cosine similarity atau jarak L2 kuadrat) dan menjumlahkan kemiripan maksimum ini di semua kata dalam kueri
ColPali adalah metode yang menggabungkan representasi multi-vektor ColBERT dengan PaliGemma (model bahasa besar multimodal) untuk meningkatkan kemampuan pemahamannya yang kuat. Pendekatan ini memungkinkan sebuah halaman dengan teks dan gambar direpresentasikan menggunakan penyematan multi-vektor terpadu. Penyematan dalam representasi multi-vektor ini dapat menangkap informasi yang terperinci, meningkatkan kinerja pengambilan-penambahan generasi (RAG) untuk data multimodal.
Dalam buku catatan ini, kami menyebut representasi multi-vektor semacam ini sebagai "embeddings ColBERT" untuk keumumannya. Namun, model sebenarnya yang digunakan adalah model ColPali. Kami akan mendemonstrasikan bagaimana cara menggunakan Milvus untuk pengambilan multi-vektor. Selanjutnya, kami akan memperkenalkan cara menggunakan ColPali untuk mengambil halaman berdasarkan kueri yang diberikan.
Persiapan
$ pip install pdf2image
$ pip pymilvus
$ pip install colpali_engine
$ pip install tqdm
$ pip instal pillow
Siapkan data
Kita akan menggunakan PDF RAG sebagai contoh. Anda dapat mengunduh dokumen ColBERT dan memasukkannya ke dalam ./pdf
. ColPali tidak memproses teks secara langsung; sebaliknya, seluruh halaman di-raster menjadi sebuah gambar. Model ColPali unggul dalam memahami informasi tekstual yang terkandung di dalam gambar-gambar ini. Oleh karena itu, kita akan mengubah setiap halaman PDF menjadi gambar untuk diproses.
from pdf2image import convert_from_path
pdf_path = "pdfs/2004.12832v2.pdf"
images = convert_from_path(pdf_path)
for i, image in enumerate(images):
image.save(f"pages/page_{i + 1}.png", "PNG")
Selanjutnya, kita akan menginisialisasi database menggunakan Milvus Lite. Anda dapat dengan mudah beralih ke instance Milvus penuh dengan mengatur uri ke alamat yang sesuai di mana layanan Milvus Anda dihosting.
from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures
client = MilvusClient(uri="milvus.db")
- Jika Anda hanya membutuhkan basis data vektor lokal untuk data skala kecil atau pembuatan prototipe, mengatur uri sebagai file lokal, misalnya
./milvus.db
, adalah metode yang paling mudah, karena secara otomatis menggunakan Milvus Lite untuk menyimpan semua data dalam file ini. - Jika Anda memiliki data berskala besar, misalnya lebih dari satu juta vektor, Anda dapat menyiapkan server Milvus yang lebih berkinerja tinggi di Docker atau Kubernetes. Dalam pengaturan ini, gunakan alamat dan port server sebagai uri Anda, misalnya
http://localhost:19530
. Jika Anda mengaktifkan fitur autentikasi pada Milvus, gunakan "<nama_user Anda>:<kata sandi Anda>" sebagai token, jika tidak, jangan setel token. - Jika Anda menggunakan Zilliz Cloud, layanan cloud yang dikelola sepenuhnya untuk Milvus, sesuaikan
uri
dantoken
, yang sesuai dengan Public Endpoint dan API key di Zilliz Cloud.
Kita akan mendefinisikan kelas MilvusColbertRetriever untuk membungkus klien Milvus untuk pengambilan data multi-vektor. Implementasi ini meratakan penyematan ColBERT dan memasukkannya ke dalam sebuah koleksi, di mana setiap baris mewakili penyematan individu dari daftar penyematan ColBERT. Implementasi ini juga mencatat doc_id dan seq_id untuk melacak asal usul setiap embedding.
Ketika mencari dengan daftar embedding ColBERT, beberapa pencarian akan dilakukan-satu untuk setiap embedding ColBERT. Doc_id yang diambil kemudian akan diduplikasi. Proses pemeringkatan ulang akan dilakukan, di mana penyematan lengkap untuk setiap doc_id diambil, dan skor MaxSim dihitung untuk menghasilkan hasil peringkat akhir.
class MilvusColbertRetriever:
def __init__(self, milvus_client, collection_name, dim=128):
# Initialize the retriever with a Milvus client, collection name, and dimensionality of the vector embeddings.
# If the collection exists, load it.
self.collection_name = collection_name
self.client = milvus_client
if self.client.has_collection(collection_name=self.collection_name):
self.client.load_collection(collection_name)
self.dim = dim
def create_collection(self):
# Create a new collection in Milvus for storing embeddings.
# Drop the existing collection if it already exists and define the schema for the collection.
if self.client.has_collection(collection_name=self.collection_name):
self.client.drop_collection(collection_name=self.collection_name)
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_fields=True,
)
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
)
schema.add_field(field_name="seq_id", datatype=DataType.INT16)
schema.add_field(field_name="doc_id", datatype=DataType.INT64)
schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)
self.client.create_collection(
collection_name=self.collection_name, schema=schema
)
def create_index(self):
# Create an index on the vector field to enable fast similarity search.
# Releases and drops any existing index before creating a new one with specified parameters.
self.client.release_collection(collection_name=self.collection_name)
self.client.drop_index(
collection_name=self.collection_name, index_name="vector"
)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_name="vector_index",
index_type="HNSW", # or any other index type you want
metric_type="IP", # or the appropriate metric type
params={
"M": 16,
"efConstruction": 500,
}, # adjust these parameters as needed
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def create_scalar_index(self):
# Create a scalar index for the "doc_id" field to enable fast lookups by document ID.
self.client.release_collection(collection_name=self.collection_name)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="doc_id",
index_name="int32_index",
index_type="INVERTED", # or any other index type you want
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def search(self, data, topk):
# Perform a vector search on the collection to find the top-k most similar documents.
search_params = {"metric_type": "IP", "params": {}}
results = self.client.search(
self.collection_name,
data,
limit=int(50),
output_fields=["vector", "seq_id", "doc_id"],
search_params=search_params,
)
doc_ids = set()
for r_id in range(len(results)):
for r in range(len(results[r_id])):
doc_ids.add(results[r_id][r]["entity"]["doc_id"])
scores = []
def rerank_single_doc(doc_id, data, client, collection_name):
# Rerank a single document by retrieving its embeddings and calculating the similarity with the query.
doc_colbert_vecs = client.query(
collection_name=collection_name,
filter=f"doc_id in [{doc_id}]",
output_fields=["seq_id", "vector", "doc"],
limit=1000,
)
doc_vecs = np.vstack(
[doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
)
score = np.dot(data, doc_vecs.T).max(1).sum()
return (score, doc_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
futures = {
executor.submit(
rerank_single_doc, doc_id, data, client, self.collection_name
): doc_id
for doc_id in doc_ids
}
for future in concurrent.futures.as_completed(futures):
score, doc_id = future.result()
scores.append((score, doc_id))
scores.sort(key=lambda x: x[0], reverse=True)
if len(scores) >= topk:
return scores[:topk]
else:
return scores
def insert(self, data):
# Insert ColBERT embeddings and metadata for a document into the collection.
colbert_vecs = [vec for vec in data["colbert_vecs"]]
seq_length = len(colbert_vecs)
doc_ids = [data["doc_id"] for i in range(seq_length)]
seq_ids = list(range(seq_length))
docs = [""] * seq_length
docs[0] = data["filepath"]
# Insert the data as multiple vectors (one for each sequence) along with the corresponding metadata.
self.client.insert(
self.collection_name,
[
{
"vector": colbert_vecs[i],
"seq_id": seq_ids[i],
"doc_id": doc_ids[i],
"doc": docs[i],
}
for i in range(seq_length)
],
)
Kita akan menggunakan colpali_engine untuk mengekstrak daftar sematan untuk dua kueri dan mengambil informasi yang relevan dari halaman PDF.
from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.processing_utils import BaseVisualRetrieverProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from typing import List, cast
device = get_torch_device("cpu")
model_name = "vidore/colpali-v1.2"
model = ColPali.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=device,
).eval()
queries = [
"How to end-to-end retrieval with ColBert?",
"Where is ColBERT performance table?",
]
processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))
dataloader = DataLoader(
dataset=ListDataset[str](queries),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_queries(x),
)
qs: List[torch.Tensor] = []
for batch_query in dataloader:
with torch.no_grad():
batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
embeddings_query = model(**batch_query)
qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
Selain itu, kita perlu mengekstrak daftar penyematan untuk setiap halaman dan ini menunjukkan bahwa ada 1030 penyematan 128 dimensi untuk setiap halaman.
from tqdm import tqdm
from PIL import Image
import os
images = [Image.open("./pages/" + name) for name in os.listdir("./pages")]
dataloader = DataLoader(
dataset=ListDataset[str](images),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_images(x),
)
ds: List[torch.Tensor] = []
for batch_doc in tqdm(dataloader):
with torch.no_grad():
batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
embeddings_doc = model(**batch_doc)
ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
print(ds[0].shape)
0%| | 0/10 [00:00<?, ?it/s]
100%|██████████| 10/10 [01:22<00:00, 8.24s/it]
torch.Size([1030, 128])
Kita akan membuat koleksi bernama "colpali" menggunakan MilvusColbertRetriever.
retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()
Kita akan memasukkan daftar penyematan ke dalam basis data Milvus.
filepaths = ["./pages/" + name for name in os.listdir("./pages")]
for i in range(len(filepaths)):
data = {
"colbert_vecs": ds[i].float().numpy(),
"doc_id": i,
"filepath": filepaths[i],
}
retriever.insert(data)
Sekarang kita dapat mencari halaman yang paling relevan menggunakan daftar sematan kueri.
for query in qs:
query = query.float().numpy()
result = retriever.search(query, topk=1)
print(filepaths[result[0][1]])
./pages/page_5.png
./pages/page_7.png
Terakhir, kita mengambil nama halaman asli. Dengan ColPali, kita dapat mengambil dokumen multimodal tanpa memerlukan teknik pemrosesan yang rumit untuk mengekstrak teks dan gambar dari dokumen. Dengan memanfaatkan model visi besar, lebih banyak informasi - seperti tabel dan gambar - dapat dianalisis tanpa kehilangan informasi yang signifikan.