🚀 Zilliz Cloudを無料で試す、完全管理型のMilvus—10倍の高速パフォーマンスを体験しよう!今すぐ試す>>

milvus-logo
LFAI
  • Home
  • Blog
  • Milvus、Apache Parquetファイルのインポートをサポートし、データ処理効率を向上

Milvus、Apache Parquetファイルのインポートをサポートし、データ処理効率を向上

  • Engineering
March 08, 2024
Cai Zhang, Fendy Feng

Milvusは、膨大なデータセットを扱うことができることで有名なスケーラブルなベクトルデータベースですが、バージョン2.3.4でParquetファイルのサポートを導入し、大きな一歩を踏み出しました。Apache Parquetを採用することで、ユーザーはデータのインポートプロセスを合理化し、ストレージと計算コストの大幅な削減を享受することができます。

最新の投稿では、Parquetの利点とMilvusユーザーにもたらすメリットを探ります。この機能を統合した動機について説明し、ParquetファイルをMilvusにシームレスにインポートするためのステップバイステップのガイドを提供します。

Apache Parquetとは?

Apache Parquetは、大規模データセットの保存と処理の効率を高めるために設計された、オープンソースの列指向データファイルフォーマットです。CSVやJSONのような従来の行指向のデータ形式とは対照的に、Parquetは列単位でデータを保存し、より効率的なデータ圧縮とエンコード方式を提供します。このアプローチは、パフォーマンスの向上、ストレージ要件の削減、処理能力の強化につながり、複雑なデータを大量に扱うのに理想的です。

ParquetファイルインポートのサポートによるMilvusユーザのメリット

MilvusはParquetファイルインポートのサポートを拡張し、ストレージや計算コストの削減、データ管理の合理化、インポートプロセスの簡素化など、最適化されたエクスペリエンスと様々な利点をユーザーに提供します。

ストレージ効率の最適化とデータ管理の合理化

Parquetは、さまざまなデータタイプに対応する柔軟な圧縮オプションと効率的なエンコーディングスキームを提供し、最適なストレージ効率を実現します。この柔軟性は、ストレージを1オンスでも節約することが具体的なコスト削減に直結するクラウド環境では特に価値があります。Milvusのこの新機能により、ユーザーはすべての多様なデータを単一のファイルに簡単に統合することができ、データ管理を合理化し、全体的なユーザーエクスペリエンスを向上させることができます。この機能は、可変長のArrayデータタイプを扱うユーザーにとって特に有益であり、データのインポートプロセスを簡素化することができます。

クエリパフォーマンスの向上

Parquetのカラム型ストレージ設計と高度な圧縮方法により、クエリパフォーマンスが大幅に向上しました。クエリを実行する際、無関係なデータをスキャンすることなく、必要なデータのみに集中することができます。この選択的な列の読み取りにより、CPU使用率が最小限に抑えられ、クエリ時間が短縮されます。

幅広い言語互換性

ParquetはJava、C++、Pythonなど複数の言語で利用可能で、多数のデータ処理ツールと互換性があります。Parquetファイルのサポートにより、異なるSDKを使用しているMilvusユーザは、データベース内で解析するためのParquetファイルをシームレスに生成することができます。

MilvusへのParquetファイルのインポート方法

データが既にParquetファイル形式である場合、インポートは簡単です。ParquetファイルをMinIOのようなオブジェクトストレージシステムにアップロードすれば、インポートの準備は完了です。

下のコードはParquetファイルをMilvusにインポートする例です。

remote_files = []
try:
    print("Prepare upload files")
    minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY,
                         secure=False)
    found = minio_client.bucket_exists(bucket_name)
    if not found:
        minio_client.make_bucket(bucket_name)
        print("MinIO bucket '{}' doesn't exist".format(bucket_name))
        return False, []

    # set your remote data path
    remote_data_path = "milvus_bulkinsert"

    def upload_file(f: str):
        file_name = os.path.basename(f)
        minio_file_path = os.path.join(remote_data_path, "parquet", file_name)
        minio_client.fput_object(bucket_name, minio_file_path, f)
        print("Upload file '{}' to '{}'".format(f, minio_file_path))
        remote_files.append(minio_file_path)

    upload_file(data_file)

except S3Error as e:
    print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
    return False, []

print("Successfully upload files: {}".format(remote_files))
return True, remote_files

データがParquetファイルでない場合やダイナミックフィールドを持つ場合は、データフォーマット変換ツールであるBulkWriterを活用してParquetファイルを生成することができます。BulkWriterはデフォルトの出力データ形式としてParquetを採用し、開発者にとってより直感的な操作性を実現しています。

以下のコードスニペットは、BulkWriterを使用してParquetファイルを生成する例です。

import numpy as np
import json

from pymilvus import (
    RemoteBulkWriter,
    BulkFileType,
)

remote_writer = RemoteBulkWriter(
        schema=your_collection_schema,
        remote_path="your_remote_data_path",
        connect_param=RemoteBulkWriter.ConnectParam(
            endpoint=YOUR_MINIO_ADDRESS,
            access_key=YOUR_MINIO_ACCESS_KEY,
            secret_key=YOUR_MINIO_SECRET_KEY,
            bucket_name="a-bucket",
        ),
        file_type=BulkFileType.PARQUET,
)

# append your data
batch_count = 10000
for i in range(batch_count):
    row = {
        "id": i,
        "bool": True if i % 5 == 0 else False,
        "int8": i % 128,
        "int16": i % 1000,
        "int32": i % 100000,
        "int64": i,
        "float": i / 3,
        "double": i / 7,
        "varchar": f"varchar_{i}",
        "json": {"dummy": i, "ok": f"name_{i}"},
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    }
    remote_writer.append_row(row)

# append rows by numpy type
for i in range(batch_count):
    remote_writer.append_row({
        "id": np.int64(i + batch_count),
        "bool": True if i % 3 == 0 else False,
        "int8": np.int8(i % 128),
        "int16": np.int16(i % 1000),
        "int32": np.int32(i % 100000),
        "int64": np.int64(i),
        "float": np.float32(i / 3),
        "double": np.float64(i / 7),
        "varchar": f"varchar_{i}",
        "json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    })

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
remote_writer.commit()
print(f"Data files have been uploaded: {remote_writer.batch_files}")
remote_files = remote_writer.batch_files

次に、Parquetファイルをmilvusにインポートします。

remote_files = [remote_file_path]
task_id = utility.do_bulk_insert(collection_name=collection_name,
                                 files=remote_files)

task_ids = [task_id]         
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
for state in states:
    if state.state == BulkInsertState.ImportCompleted:
        complete_count = complete_count + 1

これで、あなたのデータはMilvusにシームレスに統合されました。

今後の展開

Milvusが増え続けるデータ量に対応し続けるにつれ、特にParquetファイルが10GBを超えるような大規模なインポートの管理という課題が生じます。この課題に取り組むため、インポートデータをスカラー列とベクトル列に分離し、インポートごとに2つのParquetファイルを作成してI/Oプレッシャーを軽減する予定です。数百ギガバイトを超えるデータセットの場合は、複数回インポートすることをお勧めします。

Like the article? Spread the word

続けて読む