🚀 免費嘗試 Zilliz Cloud,完全托管的 Milvus,體驗速度提升 10 倍!立即嘗試

milvus-logo
LFAI
  • Home
  • Blog
  • Milvus 支援 Apache Parquet 檔案的匯入,以提升資料處理效率

Milvus 支援 Apache Parquet 檔案的匯入,以提升資料處理效率

  • Engineering
March 08, 2024
Cai Zhang, Fendy Feng

Milvus 是高度可擴充的向量資料庫,以能夠處理龐大的資料集而聞名,在2.3.4 版中引入 Parquet 檔案支援,向前邁進了一大步。透過採用 Apache Parquet,使用者可以簡化資料匯入程序,並大幅節省儲存與計算成本。

在我們最新的文章中,我們探討了 Parquet 的優勢以及它為 Milvus 用戶帶來的好處。我們討論整合此功能背後的動機,並提供逐步指南,說明如何將 Parquet 檔案無縫匯入 Milvus,為有效率的資料管理和分析開啟新的可能性。

什麼是 Apache Parquet?

Apache Parquet是一種流行的開放原始碼列導向資料檔案格式,設計用來提高大規模資料集的儲存和處理效率。與 CSV 或 JSON 等傳統面向行的資料格式不同,Parquet 以列儲存資料,提供更有效率的資料壓縮和編碼方案。此方法可提升效能、降低儲存需求並增強處理能力,使其成為處理大量複雜資料的理想選擇。

Milvus 用戶如何從 Parquet 檔案匯入支援中獲益

Milvus 擴展了對 Parquet 檔案匯入的支援,提供使用者最佳化的體驗和各種優勢,包括降低儲存和計算費用、簡化資料管理和簡化匯入流程。

最佳化儲存效率與簡化資料管理

Parquet 提供彈性的壓縮選項和有效率的編碼方案,以迎合不同的資料類型,確保最佳的儲存效率。這種彈性在雲端環境中特別有價值,因為在雲端環境中,每一分儲存空間的節省都直接關係到實質成本的降低。有了 Milvus 的這項新功能,使用者可以毫不費力地將所有不同的資料整合成單一檔案,簡化資料管理並提升整體使用者體驗。這項功能對於使用可變長度 Array 資料類型的使用者尤其有利,他們現在可以享受簡化的資料匯入程序。

改善查詢效能

Parquet 的列式儲存設計和先進的壓縮方法可大幅提升查詢效能。在進行查詢時,使用者可專注於相關資料,而無需掃描不相關的資料。這種選擇性的列讀取方式可將 CPU 使用量降至最低,從而加快查詢速度。

廣泛的語言相容性

Parquet 可用於多種語言,例如 Java、C++ 和 Python,並與大量資料處理工具相容。有了 Parquet 檔案的支援,使用不同 SDK 的 Milvus 使用者可以無縫產生 Parquet 檔案,以便在資料庫內進行解析。

如何將 Parquet 檔案匯入 Milvus

如果您的資料已經是 Parquet 檔案格式,匯入很容易。將 Parquet 檔案上傳到物件儲存系統,例如 MinIO,就可以匯入了。

下面的程式碼片段是匯入 Parquet 檔案到 Milvus 的範例。

remote_files = []
try:
    print("Prepare upload files")
    minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY,
                         secure=False)
    found = minio_client.bucket_exists(bucket_name)
    if not found:
        minio_client.make_bucket(bucket_name)
        print("MinIO bucket '{}' doesn't exist".format(bucket_name))
        return False, []

    # set your remote data path
    remote_data_path = "milvus_bulkinsert"

    def upload_file(f: str):
        file_name = os.path.basename(f)
        minio_file_path = os.path.join(remote_data_path, "parquet", file_name)
        minio_client.fput_object(bucket_name, minio_file_path, f)
        print("Upload file '{}' to '{}'".format(f, minio_file_path))
        remote_files.append(minio_file_path)

    upload_file(data_file)

except S3Error as e:
    print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
    return False, []

print("Successfully upload files: {}".format(remote_files))
return True, remote_files

如果您的資料不是 Parquet 檔案或具有動態欄位,您可以利用我們的資料格式轉換工具 BulkWriter 來幫助您產生 Parquet 檔案。BulkWriter 現在已採用 Parquet 作為其預設的輸出資料格式,確保開發人員有更直覺的體驗。

下面的程式碼片段是使用 BulkWriter 產生 Parquet 檔案的範例。

import numpy as np
import json

from pymilvus import (
    RemoteBulkWriter,
    BulkFileType,
)

remote_writer = RemoteBulkWriter(
        schema=your_collection_schema,
        remote_path="your_remote_data_path",
        connect_param=RemoteBulkWriter.ConnectParam(
            endpoint=YOUR_MINIO_ADDRESS,
            access_key=YOUR_MINIO_ACCESS_KEY,
            secret_key=YOUR_MINIO_SECRET_KEY,
            bucket_name="a-bucket",
        ),
        file_type=BulkFileType.PARQUET,
)

# append your data
batch_count = 10000
for i in range(batch_count):
    row = {
        "id": i,
        "bool": True if i % 5 == 0 else False,
        "int8": i % 128,
        "int16": i % 1000,
        "int32": i % 100000,
        "int64": i,
        "float": i / 3,
        "double": i / 7,
        "varchar": f"varchar_{i}",
        "json": {"dummy": i, "ok": f"name_{i}"},
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    }
    remote_writer.append_row(row)

# append rows by numpy type
for i in range(batch_count):
    remote_writer.append_row({
        "id": np.int64(i + batch_count),
        "bool": True if i % 3 == 0 else False,
        "int8": np.int8(i % 128),
        "int16": np.int16(i % 1000),
        "int32": np.int32(i % 100000),
        "int64": np.int64(i),
        "float": np.float32(i / 3),
        "double": np.float64(i / 7),
        "varchar": f"varchar_{i}",
        "json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    })

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
remote_writer.commit()
print(f"Data files have been uploaded: {remote_writer.batch_files}")
remote_files = remote_writer.batch_files

然後,您就可以開始將 Parquet 檔案匯入 Milvus。

remote_files = [remote_file_path]
task_id = utility.do_bulk_insert(collection_name=collection_name,
                                 files=remote_files)

task_ids = [task_id]         
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
for state in states:
    if state.state == BulkInsertState.ImportCompleted:
        complete_count = complete_count + 1

現在,您的資料已無縫整合至 Milvus。

下一步是什麼?

隨著 Milvus 持續支援不斷成長的資料量,在管理龐大的匯入,尤其是 Parquet 檔案超過 10GB 時,就會出現挑戰。為了解決這個挑戰,我們計劃將匯入資料分隔為標量列和向量列,每次匯入建立兩個 Parquet 檔案,以減輕 I/O 壓力。對於超過數百 GB 的資料集,我們建議多次匯入資料。

Like the article? Spread the word

繼續閱讀