milvus-logo
LFAI
홈페이지
  • 튜토리얼

Milvus로 다중 모달 검색에 ColPali 사용

Open In Colab GitHub Repository

최신 검색 모델은 일반적으로 텍스트나 이미지를 표현하기 위해 단일 임베딩을 사용합니다. 그러나 ColBERT는 각 데이터 인스턴스에 대한 임베딩 목록을 활용하고 "MaxSim" 연산을 사용하여 두 텍스트 간의 유사성을 계산하는 신경 모델입니다. 텍스트 데이터 외에도 그림, 표, 다이어그램에는 텍스트 기반 정보 검색에서 종종 무시되는 풍부한 정보가 포함되어 있습니다.

MaxSim 함수는 토큰 임베딩을 보고 쿼리와 문서(검색 대상)를 비교합니다. 쿼리의 각 단어에 대해 문서에서 가장 유사한 단어를 선택하고(코사인 유사도 또는 제곱 L2 거리 사용) 쿼리의 모든 단어에 대해 이러한 최대 유사도를 합산합니다.

ColPali는 강력한 이해 능력을 활용하기 위해 ColBERT의 다중 벡터 표현을 PaliGemma(다중 모드 대규모 언어 모델)와 결합한 방법입니다. 이 접근 방식을 사용하면 텍스트와 이미지가 모두 포함된 페이지를 통합된 멀티 벡터 임베딩을 사용하여 표현할 수 있습니다. 이 다중 벡터 표현 내의 임베딩은 상세한 정보를 캡처하여 멀티모달 데이터에 대한 검색 증강 생성(RAG)의 성능을 향상시킬 수 있습니다.

이 노트북에서는 일반성을 위해 이러한 종류의 다중 벡터 표현을 '콜버트 임베딩'이라고 부릅니다. 그러나 실제로 사용되는 모델은 ColPali 모델입니다. 다중 벡터 검색을 위해 Milvus를 사용하는 방법을 보여드리겠습니다. 이를 바탕으로 주어진 쿼리를 기반으로 페이지를 검색하기 위해 ColPali를 사용하는 방법을 소개하겠습니다.

준비

$ pip install pdf2image
$ pip pymilvus
$ pip install colpali_engine
$ pip install tqdm
$ pip instal pillow

데이터 준비

예제로 PDF RAG를 사용하겠습니다. ColBERT 용지를 다운로드하여 ./pdf 에 넣을 수 있습니다. ColPali는 텍스트를 직접 처리하지 않고 대신 전체 페이지를 이미지로 래스터화합니다. ColPali 모델은 이러한 이미지에 포함된 텍스트 정보를 이해하는 데 탁월합니다. 따라서 각 PDF 페이지를 이미지로 변환하여 처리합니다.

from pdf2image import convert_from_path

pdf_path = "pdfs/2004.12832v2.pdf"
images = convert_from_path(pdf_path)

for i, image in enumerate(images):
    image.save(f"pages/page_{i + 1}.png", "PNG")

다음으로 Milvus Lite를 사용하여 데이터베이스를 초기화합니다. Milvus 서비스가 호스팅되는 적절한 주소로 URL을 설정하여 전체 Milvus 인스턴스로 쉽게 전환할 수 있습니다.

from pymilvus import MilvusClient, DataType
import numpy as np
import concurrent.futures

client = MilvusClient(uri="milvus.db")
  • 소규모 데이터나 프로토타이핑을 위해 로컬 벡터 데이터베이스만 필요한 경우, 예를 들어./milvus.db 와 같이 로컬 파일로 uri를 설정하는 것이 가장 편리한 방법이며, 이 파일에 모든 데이터를 저장하기 위해 Milvus Lite를 자동으로 활용하기 때문입니다.
  • 백만 개 이상의 벡터와 같이 대규모 데이터가 있는 경우, Docker 또는 Kubernetes에서 더 성능이 뛰어난 Milvus 서버를 설정할 수 있습니다. 이 설정에서는 서버 주소와 포트를 URI로 사용하세요(예:http://localhost:19530). Milvus에서 인증 기능을 활성화하는 경우 토큰으로 "<사용자 이름>:<사용자 비밀번호>"를 사용하고, 그렇지 않은 경우 토큰을 설정하지 마세요.
  • 밀버스의 완전 관리형 클라우드 서비스인 질리즈 클라우드를 사용하는 경우, 질리즈 클라우드의 퍼블릭 엔드포인트와 API 키에 해당하는 uritoken 을 조정합니다.

멀티벡터 데이터 검색을 위해 Milvus 클라이언트를 감싸는 MilvusColbertRetriever 클래스를 정의합니다. 이 구현은 콜버트 임베딩을 플랫화하여 컬렉션에 삽입하며, 각 행은 콜버트 임베딩 목록에서 개별 임베딩을 나타냅니다. 또한 각 임베딩의 출처를 추적하기 위해 doc_id와 seq_id를 기록합니다.

콜버트 임베딩 목록으로 검색할 경우, 각 콜버트 임베딩에 대해 하나씩 여러 번 검색이 수행됩니다. 그런 다음 검색된 문서 ID는 중복 제거됩니다. 리랭킹 프로세스가 수행되어 각 doc_id에 대한 전체 임베딩이 가져오고 MaxSim 점수가 계산되어 최종 순위가 매겨진 결과가 생성됩니다.

class MilvusColbertRetriever:
    def __init__(self, milvus_client, collection_name, dim=128):
        # Initialize the retriever with a Milvus client, collection name, and dimensionality of the vector embeddings.
        # If the collection exists, load it.
        self.collection_name = collection_name
        self.client = milvus_client
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.load_collection(collection_name)
        self.dim = dim

    def create_collection(self):
        # Create a new collection in Milvus for storing embeddings.
        # Drop the existing collection if it already exists and define the schema for the collection.
        if self.client.has_collection(collection_name=self.collection_name):
            self.client.drop_collection(collection_name=self.collection_name)
        schema = self.client.create_schema(
            auto_id=True,
            enable_dynamic_fields=True,
        )
        schema.add_field(field_name="pk", datatype=DataType.INT64, is_primary=True)
        schema.add_field(
            field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=self.dim
        )
        schema.add_field(field_name="seq_id", datatype=DataType.INT16)
        schema.add_field(field_name="doc_id", datatype=DataType.INT64)
        schema.add_field(field_name="doc", datatype=DataType.VARCHAR, max_length=65535)

        self.client.create_collection(
            collection_name=self.collection_name, schema=schema
        )

    def create_index(self):
        # Create an index on the vector field to enable fast similarity search.
        # Releases and drops any existing index before creating a new one with specified parameters.
        self.client.release_collection(collection_name=self.collection_name)
        self.client.drop_index(
            collection_name=self.collection_name, index_name="vector"
        )
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_name="vector_index",
            index_type="HNSW",  # or any other index type you want
            metric_type="IP",  # or the appropriate metric type
            params={
                "M": 16,
                "efConstruction": 500,
            },  # adjust these parameters as needed
        )

        self.client.create_index(
            collection_name=self.collection_name, index_params=index_params, sync=True
        )

    def create_scalar_index(self):
        # Create a scalar index for the "doc_id" field to enable fast lookups by document ID.
        self.client.release_collection(collection_name=self.collection_name)

        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="doc_id",
            index_name="int32_index",
            index_type="INVERTED",  # or any other index type you want
        )

        self.client.create_index(
            collection_name=self.collection_name, index_params=index_params, sync=True
        )

    def search(self, data, topk):
        # Perform a vector search on the collection to find the top-k most similar documents.
        search_params = {"metric_type": "IP", "params": {}}
        results = self.client.search(
            self.collection_name,
            data,
            limit=int(50),
            output_fields=["vector", "seq_id", "doc_id"],
            search_params=search_params,
        )
        doc_ids = set()
        for r_id in range(len(results)):
            for r in range(len(results[r_id])):
                doc_ids.add(results[r_id][r]["entity"]["doc_id"])

        scores = []

        def rerank_single_doc(doc_id, data, client, collection_name):
            # Rerank a single document by retrieving its embeddings and calculating the similarity with the query.
            doc_colbert_vecs = client.query(
                collection_name=collection_name,
                filter=f"doc_id in [{doc_id}, {doc_id + 1}]",
                output_fields=["seq_id", "vector", "doc"],
                limit=1000,
            )
            doc_vecs = np.vstack(
                [doc_colbert_vecs[i]["vector"] for i in range(len(doc_colbert_vecs))]
            )
            score = np.dot(data, doc_vecs.T).max(1).sum()
            return (score, doc_id)

        with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
            futures = {
                executor.submit(
                    rerank_single_doc, doc_id, data, client, self.collection_name
                ): doc_id
                for doc_id in doc_ids
            }
            for future in concurrent.futures.as_completed(futures):
                score, doc_id = future.result()
                scores.append((score, doc_id))

        scores.sort(key=lambda x: x[0], reverse=True)
        if len(scores) >= topk:
            return scores[:topk]
        else:
            return scores

    def insert(self, data):
        # Insert ColBERT embeddings and metadata for a document into the collection.
        colbert_vecs = [vec for vec in data["colbert_vecs"]]
        seq_length = len(colbert_vecs)
        doc_ids = [data["doc_id"] for i in range(seq_length)]
        seq_ids = list(range(seq_length))
        docs = [""] * seq_length
        docs[0] = data["filepath"]

        # Insert the data as multiple vectors (one for each sequence) along with the corresponding metadata.
        self.client.insert(
            self.collection_name,
            [
                {
                    "vector": colbert_vecs[i],
                    "seq_id": seq_ids[i],
                    "doc_id": doc_ids[i],
                    "doc": docs[i],
                }
                for i in range(seq_length)
            ],
        )

콜팔리 엔진을 사용하여 두 쿼리에 대한 임베딩 목록을 추출하고 PDF 페이지에서 관련 정보를 검색합니다.

from colpali_engine.models import ColPali
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
from colpali_engine.utils.processing_utils import BaseVisualRetrieverProcessor
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
from torch.utils.data import DataLoader
import torch
from typing import List, cast

device = get_torch_device("cpu")
model_name = "vidore/colpali-v1.2"

model = ColPali.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map=device,
).eval()

queries = [
    "How to end-to-end retrieval with ColBert?",
    "Where is ColBERT performance table?",
]

processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))

dataloader = DataLoader(
    dataset=ListDataset[str](queries),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_queries(x),
)

qs: List[torch.Tensor] = []
for batch_query in dataloader:
    with torch.no_grad():
        batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
        embeddings_query = model(**batch_query)
    qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))

또한 각 페이지에 대한 임베딩 목록을 추출해야 하며, 각 페이지에 1030개의 128차원 임베딩이 있음을 보여줍니다.

from tqdm import tqdm
from PIL import Image
import os

images = [Image.open("./pages/" + name) for name in os.listdir("./pages")]

dataloader = DataLoader(
    dataset=ListDataset[str](images),
    batch_size=1,
    shuffle=False,
    collate_fn=lambda x: processor.process_images(x),
)

ds: List[torch.Tensor] = []
for batch_doc in tqdm(dataloader):
    with torch.no_grad():
        batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
        embeddings_doc = model(**batch_doc)
    ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))

print(ds[0].shape)
  0%|          | 0/10 [00:00<?, ?it/s]

100%|██████████| 10/10 [01:22<00:00,  8.24s/it]

torch.Size([1030, 128])

MilvusColbertRetriever를 사용하여 "colpali"라는 컬렉션을 만들겠습니다.

retriever = MilvusColbertRetriever(collection_name="colpali", milvus_client=client)
retriever.create_collection()
retriever.create_index()

Milvus 데이터베이스에 임베딩 목록을 삽입합니다.

filepaths = ["./pages/" + name for name in os.listdir("./pages")]
for i in range(len(filepaths)):
    data = {
        "colbert_vecs": ds[i].float().numpy(),
        "doc_id": i,
        "filepath": filepaths[i],
    }
    retriever.insert(data)

이제 쿼리 임베딩 목록을 사용하여 가장 관련성이 높은 페이지를 검색할 수 있습니다.

for query in qs:
    query = query.float().numpy()
    result = retriever.search(query, topk=1)
    print(filepaths[result[0][1]])
./pages/page_5.png
./pages/page_7.png

마지막으로 원본 페이지 이름을 검색합니다. ColPali를 사용하면 문서에서 텍스트와 이미지를 추출하기 위한 복잡한 처리 기술 없이도 멀티모달 문서를 검색할 수 있습니다. 대규모 비전 모델을 활용하면 표와 그림과 같은 더 많은 정보를 큰 정보 손실 없이 분석할 수 있습니다.

번역DeepLogo

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
피드백

이 페이지가 도움이 되었나요?