🚀 免費嘗試 Zilliz Cloud,完全托管的 Milvus,體驗速度提升 10 倍!立即嘗試

milvus-logo
LFAI
主頁
  • 教學
  • Home
  • Docs
  • 教學

  • 使用 Matryoshka 嵌入式進行漏斗搜尋

使用 Matryoshka 嵌入式進行漏斗搜尋

在建立有效率的向量搜尋系統時,一個主要的挑戰是管理儲存成本,同時維持可接受的延遲和召回率。現代的嵌入模型會輸出成百上千維度的向量,為原始向量和索引帶來顯著的儲存和計算開銷。

傳統上,我們會在建立索引之前,先應用量化或降維方法來降低儲存需求。例如,我們可以使用積量化 (Product Quantization, PQ) 降低精確度,或使用主成分分析 (Principal Component Analysis, PCA) 降低維數,以節省儲存空間。這些方法會分析整個向量集,找出更精簡的向量集,以維持向量之間的語意關係。

這些標準方法雖然有效,但只減少一次精確度或維度,而且是在單一尺度上。但如果我們可以同時維持多層的細節,就像金字塔般越來越精確的表達方式,那又會如何呢?

進入 Matryoshka 嵌入式。以俄羅斯嵌套娃娃命名 (見插圖),這些聰明的結構將多層表徵嵌入單一向量中。與傳統的後處理方法不同,Matryoshka 內嵌會在初始訓練過程中學習這種多尺度結構。其結果非常顯著:完整的嵌入不僅能捕捉輸入的語意,而且每個嵌套的子集前綴(前半部分、前四分之一等)都能提供連貫的表達,即使不那麼詳細。

在本筆記簿中,我們將探討如何使用 Matryoshka 內嵌與 Milvus 來進行語意搜尋。我們說明一種稱為「漏斗搜尋」(funnel search)的演算法,它可以讓我們在嵌入維度的一小部分子集上執行相似性搜尋,而不會大幅降低召回率。

import functools

from datasets import load_dataset
import numpy as np
import pandas as pd
import pymilvus
from pymilvus import MilvusClient
from pymilvus import FieldSchema, CollectionSchema, DataType
from sentence_transformers import SentenceTransformer
import torch
import torch.nn.functional as F
from tqdm import tqdm

載入 Matryoshka 嵌入模型

我們不使用標準的嵌入模型,例如 sentence-transformers/all-MiniLM-L12-v2,我們使用Nomic特別為產生 Matryoshka 嵌入而訓練的模型

model = SentenceTransformer(
    # Remove 'device='mps' if running on non-Mac device
    "nomic-ai/nomic-embed-text-v1.5",
    trust_remote_code=True,
    device="mps",
)
<All keys matched successfully>

載入資料集、嵌入項目和建立向量資料庫

以下的程式碼是修改自「Movie Search with Sentence Transformers and Milvus」文件頁面的程式碼首先,我們從 HuggingFace 載入資料集。它包含約 35k 個條目,每個條目對應一部有 Wikipedia 文章的電影。我們將在本範例中使用TitlePlotSummary 欄位。

ds = load_dataset("vishnupriyavr/wiki-movie-plots-with-summaries", split="train")
print(ds)
Dataset({
    features: ['Release Year', 'Title', 'Origin/Ethnicity', 'Director', 'Cast', 'Genre', 'Wiki Page', 'Plot', 'PlotSummary'],
    num_rows: 34886
})

接下來,我們連接到 Milvus Lite 資料庫,指定資料模式,並以此模式建立一個集合。我們會將未規範化的 embedding 和 embedding 的前六分之一分別儲存在不同的欄位中。這樣做的原因是,我們需要 Matryoshka 內嵌的前 1/6 來執行相似性搜尋,而其餘 5/6 的內嵌則用於重新排序和改善搜尋結果。

embedding_dim = 768
search_dim = 128
collection_name = "movie_embeddings"

client = MilvusClient(uri="./wiki-movie-plots-matryoshka.db")

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
    # First sixth of unnormalized embedding vector
    FieldSchema(name="head_embedding", dtype=DataType.FLOAT_VECTOR, dim=search_dim),
    # Entire unnormalized embedding vector
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=embedding_dim),
]

schema = CollectionSchema(fields=fields, enable_dynamic_field=False)
client.create_collection(collection_name=collection_name, schema=schema)

Milvus 目前不支援對嵌入式子集進行搜尋,因此我們將嵌入式分成兩部分:頭部代表向量的初始子集,用於索引和搜尋,尾部則是其餘部分。模型是針對余弦距離相似性搜尋所訓練的,因此我們將頭部的內嵌歸一化。然而,為了稍後計算較大子集的相似性,我們需要儲存頭部內嵌的規範,因此我們可以在連接至尾部之前將其非規範化。

若要透過前 1/6 的嵌入執行搜尋,我們需要在head_embedding 領域上建立向量搜尋索引。稍後,我們將比較「漏斗搜尋」與一般向量搜尋的結果,因此也會在完整的內嵌上建立搜尋索引。

重要的是,我們使用的是COSINE 而不是IP 距離公制,因為否則我們就需要追蹤內嵌規範,這會使實作變得複雜 (這在介紹漏斗搜尋演算法後會更有意義)。

index_params = client.prepare_index_params()
index_params.add_index(
    field_name="head_embedding", index_type="FLAT", metric_type="COSINE"
)
index_params.add_index(field_name="embedding", index_type="FLAT", metric_type="COSINE")
client.create_index(collection_name, index_params)

最後,我們對所有 35k 部電影的劇情摘要進行編碼,並將相對應的 embeddings 輸入資料庫。

for batch in tqdm(ds.batch(batch_size=512)):
    # This particular model requires us to prefix 'search_document:' to stored entities
    plot_summary = ["search_document: " + x.strip() for x in batch["PlotSummary"]]

    # Output of embedding model is unnormalized
    embeddings = model.encode(plot_summary, convert_to_tensor=True)
    head_embeddings = embeddings[:, :search_dim]

    data = [
        {
            "title": title,
            "head_embedding": head.cpu().numpy(),
            "embedding": embedding.cpu().numpy(),
        }
        for title, head, embedding in zip(batch["Title"], head_embeddings, embeddings)
    ]
    res = client.insert(collection_name=collection_name, data=data)
100%|██████████| 69/69 [05:57<00:00,  5.18s/it]

現在讓我們使用 Matryoshka 內嵌維度的前 1/6 執行「漏斗搜尋」。我心目中有三部電影可供檢索,並製作了自己的劇情摘要,以便查詢資料庫。我們嵌入查詢,然後在head_embedding 欄位上執行向量搜尋,擷取 128 個結果候選。

queries = [
    "An archaeologist searches for ancient artifacts while fighting Nazis.",
    "A teenager fakes illness to get off school and have adventures with two friends.",
    "A young couple with a kid look after a hotel during winter and the husband goes insane.",
]


# Search the database based on input text
def embed_search(data):
    embeds = model.encode(data)
    return [x for x in embeds]


# This particular model requires us to prefix 'search_query:' to queries
instruct_queries = ["search_query: " + q.strip() for q in queries]
search_data = embed_search(instruct_queries)

# Normalize head embeddings
head_search = [x[:search_dim] for x in search_data]

# Perform standard vector search on first sixth of embedding dimensions
res = client.search(
    collection_name=collection_name,
    data=head_search,
    anns_field="head_embedding",
    limit=128,
    output_fields=["title", "head_embedding", "embedding"],
)

此時,我們已在小得多的向量空間上執行搜尋,因此相對於在完整空間上的搜尋,可能降低了延遲,並減少了索引的儲存需求。讓我們檢查每個查詢的前 5 個匹配結果:

for query, hits in zip(queries, res):
    rows = [x["entity"] for x in hits][:5]

    print("Query:", query)
    print("Results:")
    for row in rows:
        print(row["title"].strip())
    print()
Query: An archaeologist searches for ancient artifacts while fighting Nazis.
Results:
"Pimpernel" Smith
Black Hunters
The Passage
Counterblast
Dominion: Prequel to the Exorcist

Query: A teenager fakes illness to get off school and have adventures with two friends.
Results:
How to Deal
Shorts
Blackbird
Valentine
Unfriended

Query: A young couple with a kid look after a hotel during winter and the husband goes insane.
Results:
Ghostkeeper
Our Vines Have Tender Grapes
The Ref
Impact
The House in Marsh Road

我們可以看到,由於在搜尋過程中截斷了嵌入式,因此召回率受到了影響。Funnel search(漏斗搜尋)用一個巧妙的技巧解決了這個問題:我們可以使用嵌入維度的剩餘部分來重新排序和修剪我們的候選清單來恢復檢索效能,而不需要執行任何額外的昂貴向量搜尋。

為了方便說明漏斗搜尋演算法,我們將每項查詢的 Milvus 搜尋命中率轉換成 Pandas 資料框架。

def hits_to_dataframe(hits: pymilvus.client.abstract.Hits) -> pd.DataFrame:
    """
    Convert a Milvus search result to a Pandas dataframe. This function is specific to our data schema.

    """
    rows = [x["entity"] for x in hits]
    rows_dict = [
        {"title": x["title"], "embedding": torch.tensor(x["embedding"])} for x in rows
    ]
    return pd.DataFrame.from_records(rows_dict)


dfs = [hits_to_dataframe(hits) for hits in res]

現在,為了執行漏斗搜尋,我們迭代越來越大的嵌入子集。每次迭代時,我們都會根據新的相似度重新排列候選項,並刪除排名最低的部分。

為了具體說明這一點,在上一步中,我們使用 1/6 的嵌入和查詢維度擷取了 128 個候選人。執行漏斗搜尋的第一步是使用前 1/3 的維度重新計算查詢與候選人之間的相似度。最下面的 64 個候選人會被剪枝。然後,我們使用前 2/3 的維度重複此過程,再使用所有的維度,連續剪枝到 32 和 16 個候選人。

# An optimized implementation would vectorize the calculation of similarity scores across rows (using a matrix)
def calculate_score(row, query_emb=None, dims=768):
    emb = F.normalize(row["embedding"][:dims], dim=-1)
    return (emb @ query_emb).item()


# You could also add a top-K parameter as a termination condition
def funnel_search(
    df: pd.DataFrame, query_emb, scales=[256, 512, 768], prune_ratio=0.5
) -> pd.DataFrame:
    # Loop over increasing prefixes of the embeddings
    for dims in scales:
        # Query vector must be normalized for each new dimensionality
        emb = torch.tensor(query_emb[:dims] / np.linalg.norm(query_emb[:dims]))

        # Score
        scores = df.apply(
            functools.partial(calculate_score, query_emb=emb, dims=dims), axis=1
        )
        df["scores"] = scores

        # Re-rank
        df = df.sort_values(by="scores", ascending=False)

        # Prune (in our case, remove half of candidates at each step)
        df = df.head(int(prune_ratio * len(df)))

    return df


dfs_results = [
    {"query": query, "results": funnel_search(df, query_emb)}
    for query, df, query_emb in zip(queries, dfs, search_data)
]
for d in dfs_results:
    print(d["query"], "\n", d["results"][:5]["title"], "\n")
An archaeologist searches for ancient artifacts while fighting Nazis. 
 0           "Pimpernel" Smith
1               Black Hunters
29    Raiders of the Lost Ark
34             The Master Key
51            My Gun Is Quick
Name: title, dtype: object 

A teenager fakes illness to get off school and have adventures with two friends. 
 21               How I Live Now
32     On the Edge of Innocence
77             Bratz: The Movie
4                    Unfriended
108                  Simon Says
Name: title, dtype: object 

A young couple with a kid look after a hotel during winter and the husband goes insane. 
 9         The Shining
0         Ghostkeeper
11     Fast and Loose
7      Killing Ground
12         Home Alone
Name: title, dtype: object 

我們已經能夠在不執行任何額外向量搜尋的情況下恢復召回率!定性來看,這些結果對「Raiders of the Lost Ark」和「The Shining」的召回率似乎比教學「Movie Search using Milvus and Sentence Transformers」中的標準向量搜尋要高,後者使用了不同的嵌入模型。然而,它卻無法找到「Ferris Bueller's Day Off」,我們稍後會在筆記型電腦中再回到這個主題。(請參閱Matryoshka Representation Learning一文,以瞭解更多量化實驗和基準)。

讓我們在相同的嵌入模型下,在相同的資料集上,比較我們的漏斗搜尋與標準向量搜尋的結果。我們在完整的嵌入模型上執行搜尋。

# Search on entire embeddings
res = client.search(
    collection_name=collection_name,
    data=search_data,
    anns_field="embedding",
    limit=5,
    output_fields=["title", "embedding"],
)
for query, hits in zip(queries, res):
    rows = [x["entity"] for x in hits]

    print("Query:", query)
    print("Results:")
    for row in rows:
        print(row["title"].strip())
    print()
Query: An archaeologist searches for ancient artifacts while fighting Nazis.
Results:
"Pimpernel" Smith
Black Hunters
Raiders of the Lost Ark
The Master Key
My Gun Is Quick

Query: A teenager fakes illness to get off school and have adventures with two friends.
Results:
A Walk to Remember
Ferris Bueller's Day Off
How I Live Now
On the Edge of Innocence
Bratz: The Movie

Query: A young couple with a kid look after a hotel during winter and the husband goes insane.
Results:
The Shining
Ghostkeeper
Fast and Loose
Killing Ground
Home Alone

除了「A teenager fakes illness to get off school...」的搜尋結果之外,漏斗搜尋的結果與完整搜尋的結果幾乎完全相同,儘管漏斗搜尋是在 128 層的搜尋空間上執行,而一般搜尋空間為 768 層。

調查 Ferris Bueller's Day Off 的漏斗搜尋失敗原因

為什麼漏斗搜尋沒有成功擷取到 Ferris Bueller's Day Off?讓我們檢視它是否在原始候選名單中,或是被錯誤地濾除。

queries2 = [
    "A teenager fakes illness to get off school and have adventures with two friends."
]


# Search the database based on input text
def embed_search(data):
    embeds = model.encode(data)
    return [x for x in embeds]


instruct_queries = ["search_query: " + q.strip() for q in queries2]
search_data2 = embed_search(instruct_queries)
head_search2 = [x[:search_dim] for x in search_data2]

# Perform standard vector search on subset of embeddings
res = client.search(
    collection_name=collection_name,
    data=head_search2,
    anns_field="head_embedding",
    limit=256,
    output_fields=["title", "head_embedding", "embedding"],
)
for query, hits in zip(queries, res):
    rows = [x["entity"] for x in hits]

    print("Query:", queries2[0])
    for idx, row in enumerate(rows):
        if row["title"].strip() == "Ferris Bueller's Day Off":
            print(f"Row {idx}: Ferris Bueller's Day Off")
Query: A teenager fakes illness to get off school and have adventures with two friends.
Row 228: Ferris Bueller's Day Off

我們看到問題出在初始候選清單不夠大,或者說,在最高粒度的層級上,想要的命中與查詢的相似度不夠高。將它從128 改為256 的結果是成功檢索。我們應該形成一個經驗規則來設定保留集上的候選人數,以經驗來評估召回率與延遲之間的權衡。

dfs = [hits_to_dataframe(hits) for hits in res]

dfs_results = [
    {"query": query, "results": funnel_search(df, query_emb)}
    for query, df, query_emb in zip(queries2, dfs, search_data2)
]

for d in dfs_results:
    print(d["query"], "\n", d["results"][:7]["title"].to_string(index=False), "\n")
A teenager fakes illness to get off school and have adventures with two friends. 
       A Walk to Remember
Ferris Bueller's Day Off
          How I Live Now
On the Edge of Innocence
        Bratz: The Movie
              Unfriended
              Simon Says 

順序重要嗎?前綴與後綴嵌入。

模型經訓練後,可以很好地匹配遞歸較小的前綴嵌入。我們使用的維度順序重要嗎?例如,我們是否也可以使用後綴嵌入的子集?在這個實驗中,我們顛倒了 Matryoshka 內嵌中維度的順序,並執行漏斗搜尋。

client = MilvusClient(uri="./wikiplots-matryoshka-flipped.db")

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="head_embedding", dtype=DataType.FLOAT_VECTOR, dim=search_dim),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=embedding_dim),
]

schema = CollectionSchema(fields=fields, enable_dynamic_field=False)
client.create_collection(collection_name=collection_name, schema=schema)

index_params = client.prepare_index_params()
index_params.add_index(
    field_name="head_embedding", index_type="FLAT", metric_type="COSINE"
)
client.create_index(collection_name, index_params)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
    - Avoid using `tokenizers` before the fork if possible
    - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
for batch in tqdm(ds.batch(batch_size=512)):
    plot_summary = ["search_document: " + x.strip() for x in batch["PlotSummary"]]

    # Encode and flip embeddings
    embeddings = model.encode(plot_summary, convert_to_tensor=True)
    embeddings = torch.flip(embeddings, dims=[-1])
    head_embeddings = embeddings[:, :search_dim]

    data = [
        {
            "title": title,
            "head_embedding": head.cpu().numpy(),
            "embedding": embedding.cpu().numpy(),
        }
        for title, head, embedding in zip(batch["Title"], head_embeddings, embeddings)
    ]
    res = client.insert(collection_name=collection_name, data=data)
100%|██████████| 69/69 [05:50<00:00,  5.08s/it]
# Normalize head embeddings

flip_search_data = [
    torch.flip(torch.tensor(x), dims=[-1]).cpu().numpy() for x in search_data
]
flip_head_search = [x[:search_dim] for x in flip_search_data]

# Perform standard vector search on subset of embeddings
res = client.search(
    collection_name=collection_name,
    data=flip_head_search,
    anns_field="head_embedding",
    limit=128,
    output_fields=["title", "head_embedding", "embedding"],
)
dfs = [hits_to_dataframe(hits) for hits in res]

dfs_results = [
    {"query": query, "results": funnel_search(df, query_emb)}
    for query, df, query_emb in zip(queries, dfs, flip_search_data)
]

for d in dfs_results:
    print(
        d["query"],
        "\n",
        d["results"][:7]["title"].to_string(index=False, header=False),
        "\n",
    )
An archaeologist searches for ancient artifacts while fighting Nazis. 
       "Pimpernel" Smith
          Black Hunters
Raiders of the Lost Ark
         The Master Key
        My Gun Is Quick
            The Passage
        The Mole People 

A teenager fakes illness to get off school and have adventures with two friends. 
                       A Walk to Remember
                          How I Live Now
                              Unfriended
Cirque du Freak: The Vampire's Assistant
                             Last Summer
                                 Contest
                                 Day One 

A young couple with a kid look after a hotel during winter and the husband goes insane. 
         Ghostkeeper
     Killing Ground
Leopard in the Snow
              Stone
          Afterglow
         Unfaithful
     Always a Bride 

Recall 比預期中的漏斗搜尋或一般搜尋要差得多 (嵌入模型是透過嵌入維度的前綴而非後綴的對比學習來訓練的)。

總結

以下是不同方法的搜尋結果比較:

我們展示了如何使用 Matryoshka 嵌入與 Milvus 來執行更有效率的語意搜尋演算法,稱為「漏斗搜尋」。我們也探討了演算法中重排序 (reranking) 與剪枝 (pruning) 步驟的重要性,以及當初始候選名單過小時的失敗模式。最後,我們討論了在形成子嵌套時,維度的順序是如何重要的 - 它必須與模型被訓練的方式相同。或者說,正因為模型是以某種方式訓練出來的,嵌入的前綴才會有意義。現在您知道如何實作 Matryoshka 嵌入和漏斗搜尋,以降低語意搜尋的儲存成本,而不會犧牲太多的檢索效能!