🚀 Попробуйте Zilliz Cloud, полностью управляемый Milvus, бесплатно — ощутите 10-кратное увеличение производительности! Попробовать сейчас>

milvus-logo
LFAI
Главная
  • Home
  • Docs
  • Учебники

  • Используйте ColPali для мультимодального поиска

Использование ColPali для мультимодального поиска с помощью Milvus

Open In Colab GitHub Repository

Современные модели поиска обычно используют одно вложение для представления текста или изображений. Однако ColBERT - это нейронная модель, которая использует список вкраплений для каждого экземпляра данных и применяет операцию "MaxSim" для расчета сходства между двумя текстами. Помимо текстовых данных, рисунки, таблицы и диаграммы также содержат богатую информацию, которая часто игнорируется при поиске информации по тексту.

Функция MaxSim сравнивает запрос и документ (то, что вы ищете), рассматривая их вкрапления лексем. Для каждого слова в запросе она выбирает наиболее похожее слово из документа (используя косинусоидальное сходство или квадратичное расстояние L2) и суммирует эти максимальные сходства по всем словам в запросе.

ColPali - это метод, который объединяет многовекторное представление ColBERT с PaliGemma (мультимодальной моделью большого языка), чтобы использовать ее широкие возможности понимания. Этот подход позволяет представить страницу с текстом и изображениями с помощью единого многовекторного вложения. Вкрапления в этом многовекторном представлении могут захватывать подробную информацию, улучшая производительность генерации с расширенным поиском (RAG) для мультимодальных данных.

В этом блокноте мы называем этот вид многовекторного представления "вкраплениями ColBERT" для общности. Однако на самом деле используется модель ColPali. Мы продемонстрируем, как использовать Milvus для многовекторного поиска. На основе этого мы расскажем, как использовать ColPali для поиска страниц по заданному запросу.

Подготовка

$ pip install pdf2image
$ pip pymilvus
$ pip install colpali_engine
$ pip install tqdm
$ pip instal pillow

Подготовка данных

В качестве примера мы будем использовать PDF RAG. Вы можете скачать документ ColBERT и поместить его в ./pdf. ColPali не обрабатывает текст напрямую; вместо этого вся страница растеризуется в изображение. Модель ColPali отлично справляется с пониманием текстовой информации, содержащейся в этих изображениях. Поэтому мы преобразуем каждую страницу PDF в изображение для обработки.

from pdf2image import convert_from_path

pdf_path = "pdfs/2004.12832v2.pdf"
images = convert_from_path(pdf_path)

for i, image in enumerate(images):
    image.save(f"pages/page_{i + 1}.png", "PNG")

Далее мы инициализируем базу данных с помощью Milvus Lite. Вы можете легко переключиться на полный экземпляр Milvus, установив uri на соответствующий адрес, где размещен ваш сервис Milvus.

from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures

client = MilvusClient(uri="milvus.db")
  • Если вам нужна локальная векторная база данных только для небольших масштабов данных или прототипирования, установка uri в качестве локального файла, например./milvus.db, является наиболее удобным методом, поскольку он автоматически использует Milvus Lite для хранения всех данных в этом файле.
  • Если у вас большой объем данных, скажем, более миллиона векторов, вы можете настроить более производительный сервер Milvus на Docker или Kubernetes. В этом случае используйте адрес и порт сервера в качестве uri, например,http://localhost:19530. Если вы включили функцию аутентификации на Milvus, используйте "<ваше_имя_пользователя>:<ваш_пароль>" в качестве токена, в противном случае не задавайте токен.
  • Если вы используете Zilliz Cloud, полностью управляемый облачный сервис для Milvus, настройте uri и token, которые соответствуют публичной конечной точке и ключу API в Zilliz Cloud.

Мы определим класс MilvusColbertRetriever, чтобы обернуть его вокруг клиента Milvus для получения многовекторных данных. Реализация сплющивает вкрапления ColBERT и вставляет их в коллекцию, где каждая строка представляет собой отдельное вкрапление из списка вкраплений ColBERT. Она также записывает doc_id и seq_id для отслеживания происхождения каждого вкрапления.

При поиске по списку вкраплений ColBERT будет проведено несколько поисков - по одному для каждого вкрапления ColBERT. Полученные идентификаторы doc_ids будут затем дедуплицированы. Будет проведен процесс ранжирования, в ходе которого будут получены полные вкрапления для каждого doc_id и рассчитан балл MaxSim для получения окончательных ранжированных результатов.

class MilvusColbertRetriever:
    def __init__(self, milvus_client, collection_name, dim=128):
        # Initialize the retriever with a Milvus client, collection name, and dimensionality of the vector embeddings.
        # If the collection exists, load it.
        self.collection_name = collection_name
        self.client = milvus_client
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.load_collection(collection_name)
        self.dim = dim

    def create_collection(self):
        # Create a new collection in Milvus for storing embeddings.
        # Drop the existing collection if it already exists and define the schema for the collection.
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.drop_collection(collection_name=self.collection_name)
        schema = self.client.create_schema(
            auto_id=True,
            enable_dynamic_fields=True,
        )
        schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
        schema.add_field(
            field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
        )
        schema.add_field(field_name="seq_id", datatype=DataType.INT16)
        schema.add_field(field_name="doc_id", datatype=DataType.INT64)
        schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)

        self.client.create_collection(
            collection_name=self.collection_name, schema=schema
        )

    def create_index(self):
        # Create an index on the vector field to enable fast similarity search.
        # Releases and drops any existing index before creating a new one with specified parameters.
        self.client.release_collection(collection_name=self.collection_name)
        self.client.drop_index(
            collection_name=self.collection_name, index_name="vector"
        )
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_name="vector_index",
            index_type="HNSW",  # or any other index type you want
            metric_type="IP",  # or the appropriate metric type
            params={
                "M": 16,
                "efConstruction": 500,
            },  # adjust these parameters as needed
        )

        self.client.create_index(
            collection_name=self.collection_name, index_params=index_params, sync=True
        )

    def create_scalar_index(self):
        # Create a scalar index for the "doc_id" field to enable fast lookups by document ID.
        self.client.release_collection(collection_name=self.collection_name)

        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="doc_id",
            index_name="int32_index",
            index_type="INVERTED",  # or any other index type you want
        )

        self.client.create_index(
            collection_name=self.collection_name, index_params=index_params, sync=True
        )

    def search(self, data, topk):
        # Perform a vector search on the collection to find the top-k most similar documents.
        search_params = {"metric_type": "IP", "params": {}}
        results = self.client.search(
            self.collection_name,
            data,
            limit=int(50),
            output_fields=["vector", "seq_id", "doc_id"],
            search_params=search_params,
        )
        doc_ids = set()
        for r_id in range(len(results)):
            for r in range(len(results[r_id])):
                doc_ids.add(results[r_id][r]["entity"]["doc_id"])

        scores = []

        def rerank_single_doc(doc_id, data, client, collection_name):
            # Rerank a single document by retrieving its embeddings and calculating the similarity with the query.
            doc_colbert_vecs = client.query(
                collection_name=collection_name,
                filter=f"doc_id in [{doc_id}]",
                output_fields=["seq_id", "vector", "doc"],
                limit=1000,
            )
            doc_vecs = np.vstack(
                [doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
            )
            score = np.dot(data, doc_vecs.T).max(1).sum()
            return (score, doc_id)

        with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
            futures = {
                executor.submit(
                    rerank_single_doc, doc_id, data, client, self.collection_name
                ): doc_id
                for doc_id in doc_ids
            }
            for future in concurrent.futures.as_completed(futures):
                score, doc_id = future.result()
                scores.append((score, doc_id))

        scores.sort(key=lambda x: x[0], reverse=True)
        if len(scores) >= topk:
            return scores[:topk]
        else:
            return scores

    def insert(self, data):
        # Insert ColBERT embeddings and metadata for a document into the collection.
        colbert_vecs = [vec for vec in data["colbert_vecs"]]
        seq_length = len(colbert_vecs)
        doc_ids = [data["doc_id"] for i in range(seq_length)]
        seq_ids = list(range(seq_length))
        docs = [""] * seq_length
        docs[0] = data["filepath"]

        # Insert the data as multiple vectors (one for each sequence) along with the corresponding metadata.
        self.client.insert(
            self.collection_name,
            [
                {
                    "vector": colbert_vecs[i],
                    "seq_id": seq_ids[i],
                    "doc_id": doc_ids[i],
                    "doc": docs[i],
                }
                for i in range(seq_length)
            ],
        )

Мы будем использовать colpali_engine для извлечения списков вкраплений для двух запросов и извлечения релевантной информации из PDF-страниц.

from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.processing_utils import BaseVisualRetrieverProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from typing import List, cast

device = get_torch_device("cpu")
model_name = "vidore/colpali-v1.2"

model = ColPali.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map=device,
).eval()

queries = [
    "How to end-to-end retrieval with ColBert?",
    "Where is ColBERT performance table?",
]

processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))

dataloader = DataLoader(
    dataset=ListDataset[str](queries),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_queries(x),
)

qs: List[torch.Tensor] = []
for batch_query in dataloader:
    with torch.no_grad():
        batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
        embeddings_query = model(**batch_query)
    qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))

Кроме того, нам нужно извлечь список вкраплений для каждой страницы, а он показывает, что для каждой страницы имеется 1030 128-мерных вкраплений.

from tqdm import tqdm
from PIL import Image
import os

images = [Image.open("./pages/" + name) for name in os.listdir("./pages")]

dataloader = DataLoader(
    dataset=ListDataset[str](images),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_images(x),
)

ds: List[torch.Tensor] = []
for batch_doc in tqdm(dataloader):
    with torch.no_grad():
        batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
        embeddings_doc = model(**batch_doc)
    ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))

print(ds[0].shape)
  0%|          | 0/10 [00:00<?, ?it/s]

100%|██████████| 10/10 [01:22<00:00,  8.24s/it]

torch.Size([1030, 128])

Мы создадим коллекцию под названием "colpali" с помощью MilvusColbertRetriever.

retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()

Мы вставим списки вкраплений в базу данных Milvus.

filepaths = ["./pages/" + name for name in os.listdir("./pages")]
for i in range(len(filepaths)):
    data = {
        "colbert_vecs": ds[i].float().numpy(),
        "doc_id": i,
        "filepath": filepaths[i],
    }
    retriever.insert(data)

Теперь мы можем искать наиболее релевантную страницу, используя список вкраплений по запросу.

for query in qs:
    query = query.float().numpy()
    result = retriever.search(query, topk=1)
    print(filepaths[result[0][1]])
./pages/page_5.png
./pages/page_7.png

Наконец, мы получим оригинальное название страницы. С помощью ColPali мы можем извлекать мультимодальные документы, не прибегая к сложным технологиям обработки для извлечения текста и изображений из документов. Благодаря использованию больших моделей зрения можно анализировать больше информации, например таблицы и рисунки, без существенной потери информации.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?