Utilizzare ColPali per il recupero multimodale con Milvus
I moderni modelli di recupero utilizzano in genere un singolo embedding per rappresentare il testo o le immagini. ColBERT, invece, è un modello neurale che utilizza un elenco di incorporazioni per ogni istanza di dati e impiega un'operazione "MaxSim" per calcolare la somiglianza tra due testi. Oltre ai dati testuali, anche le figure, le tabelle e i diagrammi contengono informazioni ricche, che spesso non vengono prese in considerazione nel recupero delle informazioni basato sul testo.
La funzione MaxSim confronta una query con un documento (quello che si sta cercando) osservando le loro incorporazioni di token. Per ogni parola della query, sceglie la parola più simile dal documento (usando la somiglianza coseno o la distanza L2 al quadrato) e somma queste somiglianze massime tra tutte le parole della query.
ColPali è un metodo che combina la rappresentazione multivettoriale di ColBERT con PaliGemma (un modello linguistico multimodale di grandi dimensioni) per sfruttare le sue forti capacità di comprensione. Questo approccio consente di rappresentare una pagina con testo e immagini utilizzando un embedding multivettoriale unificato. Gli embedding all'interno di questa rappresentazione multivettoriale possono catturare informazioni dettagliate, migliorando le prestazioni della retrieval-augmented generation (RAG) per i dati multimodali.
In questo quaderno, per generalità, ci riferiamo a questo tipo di rappresentazione multivettoriale come "embeddings ColBERT". Tuttavia, il modello effettivamente utilizzato è il modello ColPali. Dimostreremo come utilizzare Milvus per il reperimento di dati multivettoriali. A partire da questo, introdurremo l'uso di ColPali per il reperimento di pagine basate su una determinata query.
Preparazione
$ pip install pdf2image
$ pip pymilvus
$ pip install colpali_engine
$ pip install tqdm
$ pip instal pillow
Preparare i dati
Come esempio utilizzeremo PDF RAG. È possibile scaricare il documento ColBERT e inserirlo in ./pdf
. ColPali non elabora direttamente il testo; l'intera pagina viene invece rasterizzata in un'immagine. Il modello ColPali eccelle nella comprensione delle informazioni testuali contenute in queste immagini. Pertanto, convertiremo ogni pagina PDF in un'immagine da elaborare.
from pdf2image import convert_from_path
pdf_path = "pdfs/2004.12832v2.pdf"
images = convert_from_path(pdf_path)
for i, image in enumerate(images):
image.save(f"pages/page_{i + 1}.png", "PNG")
Successivamente, inizializzeremo un database utilizzando Milvus Lite. È possibile passare facilmente a un'istanza completa di Milvus impostando l'uri all'indirizzo appropriato in cui è ospitato il servizio Milvus.
from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures
client = MilvusClient(uri="milvus.db")
- Se si ha bisogno di un database vettoriale locale solo per dati su piccola scala o per la prototipazione, l'impostazione dell'uri come file locale, ad esempio
./milvus.db
, è il metodo più conveniente, in quanto utilizza automaticamente Milvus Lite per memorizzare tutti i dati in questo file. - Se si dispone di una grande quantità di dati, ad esempio più di un milione di vettori, è possibile configurare un server Milvus più performante su Docker o Kubernetes. In questa configurazione, utilizzare l'indirizzo e la porta del server come uri, ad esempio
http://localhost:19530
. Se si attiva la funzione di autenticazione su Milvus, utilizzare "<nome_utente>:<password>" come token, altrimenti non impostare il token. - Se si utilizza Zilliz Cloud, il servizio cloud completamente gestito per Milvus, impostare
uri
etoken
, che corrispondono all'endpoint pubblico e alla chiave API di Zilliz Cloud.
Definiremo una classe MilvusColbertRetriever per avvolgere il client Milvus per il recupero di dati multivettoriali. L'implementazione appiattisce le incorporazioni ColBERT e le inserisce in una raccolta, dove ogni riga rappresenta una singola incorporazione dall'elenco delle incorporazioni ColBERT. Inoltre, registra il doc_id e il seq_id per risalire all'origine di ogni embedding.
Quando si effettua una ricerca con un elenco di incorporazioni ColBERT, vengono effettuate più ricerche, una per ogni incorporazione ColBERT. I doc_id recuperati saranno quindi deduplicati. Verrà eseguito un processo di reranking, in cui verranno recuperati gli embedding completi per ogni doc_id e verrà calcolato il punteggio MaxSim per produrre i risultati finali classificati.
class MilvusColbertRetriever:
def __init__(self, milvus_client, collection_name, dim=128):
# Initialize the retriever with a Milvus client, collection name, and dimensionality of the vector embeddings.
# If the collection exists, load it.
self.collection_name = collection_name
self.client = milvus_client
if self.client.has_collection(collection_name=self.collection_name):
self.client.load_collection(collection_name)
self.dim = dim
def create_collection(self):
# Create a new collection in Milvus for storing embeddings.
# Drop the existing collection if it already exists and define the schema for the collection.
if self.client.has_collection(collection_name=self.collection_name):
self.client.drop_collection(collection_name=self.collection_name)
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_fields=True,
)
schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
)
schema.add_field(field_name="seq_id", datatype=DataType.INT16)
schema.add_field(field_name="doc_id", datatype=DataType.INT64)
schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)
self.client.create_collection(
collection_name=self.collection_name, schema=schema
)
def create_index(self):
# Create an index on the vector field to enable fast similarity search.
# Releases and drops any existing index before creating a new one with specified parameters.
self.client.release_collection(collection_name=self.collection_name)
self.client.drop_index(
collection_name=self.collection_name, index_name="vector"
)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_name="vector_index",
index_type="HNSW", # or any other index type you want
metric_type="IP", # or the appropriate metric type
params={
"M": 16,
"efConstruction": 500,
}, # adjust these parameters as needed
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def create_scalar_index(self):
# Create a scalar index for the "doc_id" field to enable fast lookups by document ID.
self.client.release_collection(collection_name=self.collection_name)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="doc_id",
index_name="int32_index",
index_type="INVERTED", # or any other index type you want
)
self.client.create_index(
collection_name=self.collection_name, index_params=index_params, sync=True
)
def search(self, data, topk):
# Perform a vector search on the collection to find the top-k most similar documents.
search_params = {"metric_type": "IP", "params": {}}
results = self.client.search(
self.collection_name,
data,
limit=int(50),
output_fields=["vector", "seq_id", "doc_id"],
search_params=search_params,
)
doc_ids = set()
for r_id in range(len(results)):
for r in range(len(results[r_id])):
doc_ids.add(results[r_id][r]["entity"]["doc_id"])
scores = []
def rerank_single_doc(doc_id, data, client, collection_name):
# Rerank a single document by retrieving its embeddings and calculating the similarity with the query.
doc_colbert_vecs = client.query(
collection_name=collection_name,
filter=f"doc_id in [{doc_id}, {doc_id + 1}]",
output_fields=["seq_id", "vector", "doc"],
limit=1000,
)
doc_vecs = np.vstack(
[doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
)
score = np.dot(data, doc_vecs.T).max(1).sum()
return (score, doc_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
futures = {
executor.submit(
rerank_single_doc, doc_id, data, client, self.collection_name
): doc_id
for doc_id in doc_ids
}
for future in concurrent.futures.as_completed(futures):
score, doc_id = future.result()
scores.append((score, doc_id))
scores.sort(key=lambda x: x[0], reverse=True)
if len(scores) >= topk:
return scores[:topk]
else:
return scores
def insert(self, data):
# Insert ColBERT embeddings and metadata for a document into the collection.
colbert_vecs = [vec for vec in data["colbert_vecs"]]
seq_length = len(colbert_vecs)
doc_ids = [data["doc_id"] for i in range(seq_length)]
seq_ids = list(range(seq_length))
docs = [""] * seq_length
docs[0] = data["filepath"]
# Insert the data as multiple vectors (one for each sequence) along with the corresponding metadata.
self.client.insert(
self.collection_name,
[
{
"vector": colbert_vecs[i],
"seq_id": seq_ids[i],
"doc_id": doc_ids[i],
"doc": docs[i],
}
for i in range(seq_length)
],
)
Utilizzeremo colpali_engine per estrarre gli elenchi di incorporazioni per due query e recuperare le informazioni rilevanti dalle pagine PDF.
from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.processing_utils import BaseVisualRetrieverProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from typing import List, cast
device = get_torch_device("cpu")
model_name = "vidore/colpali-v1.2"
model = ColPali.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=device,
).eval()
queries = [
"How to end-to-end retrieval with ColBert?",
"Where is ColBERT performance table?",
]
processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))
dataloader = DataLoader(
dataset=ListDataset[str](queries),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_queries(x),
)
qs: List[torch.Tensor] = []
for batch_query in dataloader:
with torch.no_grad():
batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
embeddings_query = model(**batch_query)
qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
Inoltre, dovremo estrarre l'elenco di incorporazioni per ogni pagina, che mostra 1030 incorporazioni a 128 dimensioni per ogni pagina.
from tqdm import tqdm
from PIL import Image
import os
images = [Image.open("./pages/" + name) for name in os.listdir("./pages")]
dataloader = DataLoader(
dataset=ListDataset[str](images),
batch_size=1,
shuffle=False,
collate_fn=lambda x: processor.process_images(x),
)
ds: List[torch.Tensor] = []
for batch_doc in tqdm(dataloader):
with torch.no_grad():
batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
embeddings_doc = model(**batch_doc)
ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
print(ds[0].shape)
0%| | 0/10 [00:00<?, ?it/s]
100%|██████████| 10/10 [01:22<00:00, 8.24s/it]
torch.Size([1030, 128])
Creeremo una raccolta chiamata "colpali" utilizzando MilvusColbertRetriever.
retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()
Inseriamo gli elenchi di incorporazioni nel database Milvus.
filepaths = ["./pages/" + name for name in os.listdir("./pages")]
for i in range(len(filepaths)):
data = {
"colbert_vecs": ds[i].float().numpy(),
"doc_id": i,
"filepath": filepaths[i],
}
retriever.insert(data)
Ora possiamo cercare la pagina più rilevante utilizzando l'elenco di embedding della query.
for query in qs:
query = query.float().numpy()
result = retriever.search(query, topk=1)
print(filepaths[result[0][1]])
./pages/page_5.png
./pages/page_7.png
Infine, recuperiamo il nome della pagina originale. Con ColPali, possiamo recuperare documenti multimodali senza dover ricorrere a complesse tecniche di elaborazione per estrarre testo e immagini dai documenti. Sfruttando modelli di visione di grandi dimensioni, è possibile analizzare un maggior numero di informazioni, come tabelle e figure, senza una significativa perdita di informazioni.