使用 Milvus 進行 HDBSCAN 聚類
使用深度學習模型可以將資料轉換為嵌入,從而捕捉原始資料的有意義表示。透過應用無監督聚類演算法,我們可以根據類似資料點的固有模式將其歸類在一起。HDBSCAN (Hierarchical Density-Based Spatial Clustering of Applications with Noise) 是一種廣泛使用的聚類演算法,可透過分析資料點的密度和距離,將資料點有效地分組。它對於發現不同形狀和大小的聚類特別有用。在本筆記簿中,我們將使用 HDBSCAN 搭配高效能向量資料庫 Milvus,根據資料點的內嵌,將資料點聚類為不同的群組。
HDBSCAN (Hierarchical Density-Based Spatial Clustering of Applications with Noise) 是一種聚類演算法,依靠計算資料點在嵌入空間中的距離。這些由深度學習模型建立的內嵌以高維形式表示資料。為了將相似的資料點歸類,HDBSCAN 會判斷它們的接近度和密度,但有效率地計算這些距離,特別是對於大型資料集而言,可能是一項挑戰。
Milvus 是一個高效能向量資料庫,透過儲存和索引嵌入來優化這個過程,讓相似向量的快速檢索成為可能。HDBSCAN 和 Milvus 一起使用時,可以在嵌入空間中對大規模資料集進行有效的聚類。
在本筆記簿中,我們將使用 BGE-M3 嵌入模型從新聞標題資料集中萃取嵌入資料,利用 Milvus 有效計算嵌入資料之間的距離,以協助 HDBSCAN 進行聚類,然後將結果可視化,以便使用 UMAP 方法進行分析。本筆記本是Dylan Castillo 文章的 Milvus 改編版本。
準備工作
從 https://www.kaggle.com/datasets/dylanjcastillo/news-headlines-2024/ 下載新聞資料集
$ pip install "pymilvus[model]"
$ pip install hdbscan
$ pip install plotly
$ pip install umap-learn
下載資料
從 https://www.kaggle.com/datasets/dylanjcastillo/news-headlines-2024/ 下載新聞資料集,萃取news_data_dedup.csv
並放入目前目錄。
萃取嵌入到 Milvus
我們將使用 Milvus 建立一個集合,並使用 BGE-M3 模型抽取密集嵌入。
import pandas as pd
from dotenv import load_dotenv
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
from pymilvus import FieldSchema, Collection, connections, CollectionSchema, DataType
load_dotenv()
df = pd.read_csv("news_data_dedup.csv")
docs = [
f"{title}\n{description}" for title, description in zip(df.title, df.description)
]
ef = BGEM3EmbeddingFunction()
embeddings = ef(docs)["dense"]
connections.connect(uri="milvus.db")
- 如果你只需要一個本機向量資料庫來做小規模的資料或原型設計,將 uri 設定為一個本機檔案,例如
./milvus.db
,是最方便的方法,因為它會自動利用Milvus Lite將所有資料儲存在這個檔案中。 - 如果您有大規模的資料,例如超過一百萬個向量,您可以在Docker 或 Kubernetes 上架設效能更高的 Milvus 伺服器。在此設定中,請使用伺服器位址和連接埠作為您的 uri,例如
http://localhost:19530
。如果您在 Milvus 上啟用認證功能,請使用「<your_username>:<your_password>」作為令牌,否則請勿設定令牌。 - 如果您使用Zilliz Cloud(Milvus 的完全管理雲端服務),請調整
uri
和token
,它們對應於 Zilliz Cloud 中的Public Endpoint 和 API key。
fields = [
FieldSchema(
name="id", dtype=DataType.INT64, is_primary=True, auto_id=True
), # Primary ID field
FieldSchema(
name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024
), # Float vector field (embedding)
FieldSchema(
name="text", dtype=DataType.VARCHAR, max_length=65535
), # Float vector field (embedding)
]
schema = CollectionSchema(fields=fields, description="Embedding collection")
collection = Collection(name="news_data", schema=schema)
for doc, embedding in zip(docs, embeddings):
collection.insert({"text": doc, "embedding": embedding})
print(doc)
index_params = {"index_type": "FLAT", "metric_type": "L2", "params": {}}
collection.create_index(field_name="embedding", index_params=index_params)
collection.flush()
建構 HDBSCAN 的距離矩陣
HDBSCAN 需要計算點與點之間的距離,以進行聚類,而這可能是計算密集的工作。由於較遠的點對聚類分派的影響較小,我們可以透過計算前 k 個最近鄰居來提高效率。在這個範例中,我們使用 FLAT 索引,但對於大規模的資料集,Milvus 支援更進階的索引方法來加速搜尋過程。 首先,我們需要取得一個迭代器來迭代之前建立的 Milvus 集合。
import hdbscan
import numpy as np
import pandas as pd
import plotly.express as px
from umap import UMAP
from pymilvus import Collection
collection = Collection(name="news_data")
collection.load()
iterator = collection.query_iterator(
batch_size=10, expr="id > 0", output_fields=["id", "embedding"]
)
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10},
} # L2 is Euclidean distance
ids = []
dist = {}
embeddings = []
我們將遍历 Milvus 集合中的所有嵌入。對於每個嵌入,我們會搜尋它在同一集合中的前 k 個鄰居,並取得它們的 ID 和距離。之後,我們還要建立字典,將原始 ID 對應至距離矩陣中的連續索引。完成後,我們需要建立一個距離矩陣,初始化所有元素為無窮大,並填充我們搜尋到的元素。如此一來,遠距離的點之間的距離將會被忽略。最後,我們使用 HDBSCAN 函式庫,利用我們建立的距離矩陣對點進行聚類。我們需要設定 metric 為「precomputed」,以表示資料是距離矩陣,而不是原始的 embeddings。
while True:
batch = iterator.next()
batch_ids = [data["id"] for data in batch]
ids.extend(batch_ids)
query_vectors = [data["embedding"] for data in batch]
embeddings.extend(query_vectors)
results = collection.search(
data=query_vectors,
limit=50,
anns_field="embedding",
param=search_params,
output_fields=["id"],
)
for i, batch_id in enumerate(batch_ids):
dist[batch_id] = []
for result in results[i]:
dist[batch_id].append((result.id, result.distance))
if len(batch) == 0:
break
ids2index = {}
for id in dist:
ids2index[id] = len(ids2index)
dist_metric = np.full((len(ids), len(ids)), np.inf, dtype=np.float64)
for id in dist:
for result in dist[id]:
dist_metric[ids2index[id]][ids2index[result[0]]] = result[1]
h = hdbscan.HDBSCAN(min_samples=3, min_cluster_size=3, metric="precomputed")
hdb = h.fit(dist_metric)
之後,HDBSCAN 聚類就完成了。我們可以得到一些資料,並顯示其聚類。請注意,有些資料不會被分配到任何叢集,這表示它們是雜訊,因為它們位於一些稀疏的區域。
使用 UMAP 進行聚類可視化
我們已經使用 HDBSCAN 對資料進行聚類,並取得每個資料點的標籤。然而,使用一些可視化技術,我們可以得到叢集的全貌,以便進行直觀的分析。現在我們要使用 UMAP 來視覺化聚類。UMAP 是一種用於降維的有效方法,在保留高維資料結構的同時,將其投影到低維空間以進行可視化或進一步分析。在這裡,我們再次遍歷資料點,並取得原始資料的 ID 和文字,然後我們使用 ploty 將資料點與這些 metainfo 繪製成圖形,並使用不同的顏色代表不同的叢集。
import plotly.io as pio
pio.renderers.default = "notebook"
umap = UMAP(n_components=2, random_state=42, n_neighbors=80, min_dist=0.1)
df_umap = (
pd.DataFrame(umap.fit_transform(np.array(embeddings)), columns=["x", "y"])
.assign(cluster=lambda df: hdb.labels_.astype(str))
.query('cluster != "-1"')
.sort_values(by="cluster")
)
iterator = collection.query_iterator(
batch_size=10, expr="id > 0", output_fields=["id", "text"]
)
ids = []
texts = []
while True:
batch = iterator.next()
if len(batch) == 0:
break
batch_ids = [data["id"] for data in batch]
batch_texts = [data["text"] for data in batch]
ids.extend(batch_ids)
texts.extend(batch_texts)
show_texts = [texts[i] for i in df_umap.index]
df_umap["hover_text"] = show_texts
fig = px.scatter(
df_umap, x="x", y="y", color="cluster", hover_data={"hover_text": True}
)
fig.show()
圖像
在這裡,我們展示了資料很好的聚類,您可以將滑鼠懸停在點上,查看它們所代表的文字。透過這本筆記,我們希望您能學習如何使用 HDBSCAN 對 Milvus 的嵌入式資料進行有效率的聚類,這也可以應用在其他類型的資料上。結合大型語言模型,這個方法可以讓您在大規模下對資料進行更深入的分析。