milvus-logo
LFAI
フロントページへ
  • ユーザーガイド

ソースデータの準備

このページでは、データをコレクションに一括挿入する前に考慮すべきことについて説明します。

開始する前に

ターゲット・コレクションでは、ソース・データをそのスキーマにマッピングする必要があります。下図は、許容可能なソース・データをターゲット・コレクションのスキーマにマッピングする方法を示しています。

Map data to schema データをスキーマにマップする

データを慎重に調べ、それに応じてターゲット・コレクションのスキーマを設計する必要があります。

上図の JSON データを例にとると、行リストには 2 つのエンティティがあり、各行には 6 つのフィールドがあります。コレクションのスキーマには、idvectorscalar_1scalar_2の4つが選択的に含まれます。

スキーマを設計する際に考慮すべきことがさらに2つあります:

  • AutoIDを有効にするかどうか。

    idフィールドはコレクションのプライマリフィールドとして機能します。プライマリフィールドを自動的にインクリメントするには、スキーマでAutoID を有効にします。この場合、ソースデータの各行からidフィールドを除外する必要があります。

  • ダイナミック・フィールドを有効にするかどうか

    スキーマでダイナミック・フィールドを有効にすると、ターゲット・コレクションは、事前に定義されたスキーマに含まれないフィールドも格納できます。metaフィールドは、動的フィールドとその値をキーと値のペアで保持するための予約済み JSON フィールドです。上の図では、フィールドdynamic_field_1とdynamic_field_2とその値がキーと値のペアとして$metaフィールドに保存されます。

以下のコードでは、上図のコレクションのスキーマを設定する方法を示します。

より詳細な情報を得るには create_schema()および add_field()を参照してください。

詳細については、SDKリファレンスの CollectionSchemaを参照してください。

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=True
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)

schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;

// Define schema for the target collection
FieldType id = FieldType.newBuilder()
        .withName("id")
        .withDataType(DataType.Int64)
        .withPrimaryKey(true)
        .withAutoID(false)
        .build();

FieldType vector = FieldType.newBuilder()
        .withName("vector")
        .withDataType(DataType.FloatVector)
        .withDimension(768)
        .build();

FieldType scalar1 = FieldType.newBuilder()
        .withName("scalar_1")
        .withDataType(DataType.VarChar)
        .withMaxLength(512)
        .build();

FieldType scalar2 = FieldType.newBuilder()
        .withName("scalar_2")
        .withDataType(DataType.Int64)
        .build();

CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
        .withEnableDynamicField(true)
        .addFieldType(id)
        .addFieldType(vector)
        .addFieldType(scalar1)
        .addFieldType(scalar2)
        .build();

BulkWriterのセットアップ

BulkWriterは、生のデータセットをRESTful Import API経由でインポートするのに適した形式に変換するためのツールです。2種類のライターを提供しています:

  • LocalBulkWriter:指定されたデータセットを読み込み、使いやすい形式に変換します。
  • RemoteBulkWriter:LocalBulkWriterと同じタスクを実行しますが、変換されたデータファイルを指定のリモート・オブジェクト・ストレージ・バケットに転送します。

RemoteBulkWriterが LocalBulkWriterと異なる点は、RemoteBulkWriterが変換されたデータファイルをターゲット・オブジェクト・ストレージ・バケットに転送する点です。

LocalBulkWriterのセットアップ

LocalBulkWriterは、ソースデータセットから行を追加し、指定された形式のローカルファイルにコミットします。

from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    segment_size=512 * 1024 * 1024, # Default value
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;

LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withLocalPath(".")
    .withChunkSize(512 * 1024 * 1024)
    .withFileType(BulkFileType.PARQUET)
    .build();

LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);

LocalBulkWriter を作成するときは、次のようにします:

  • schema で作成したスキーマを参照する。
  • local_path を出力ディレクトリに設定します。
  • file_type を出力ファイル・タイプに設定する。
  • データセットに多数のレコードが含まれる場合は、segment_size を適切な値に設定してデータを分割することをお勧めします。

パラメータ設定の詳細については、SDKリファレンスのLocalBulkWriterを参照してください。

LocalBulkWriterを作成するときは、次のようにしてください:

  • CollectionSchema() で作成したスキーマを参照する。
  • withLocalPath() で出力ディレクトリを設定します。
  • withFileType() で出力ファイル タイプを設定する。
  • データセットに多数のレコードが含まれる場合は、withChunkSize() を適切な値に設定してデータを分割することをお勧めします。

パラメータ設定の詳細については、SDKリファレンスのLocalBulkWriterを参照してください。

RemoteBulkWriterのセットアップ

RemoteBulkWriterは、追加されたデータをローカルファイルにコミットする代わりに、リモートバケットにコミットします。したがって、RemoteBulkWriterを作成する前にConnectParamオブジェクトを設定する必要があります。

from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter` 
# when you use pymilvus earlier than 2.4.2 

# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="localhost:9000", # the default MinIO service started along with Milvus
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;

String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";

StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
    .withEndpoint(MINIO_URI)
    .withAccessKey(ACCESS_KEY)
    .withSecretKey(SECRET_KEY)
    .withBucketName(BUCKET_NAME)
    .build();

接続パラメータが準備できたら、以下のようにRemoteBulkWriterで参照できます:

from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;

RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withConnectParam(storageConnectParam)
    .withChunkSize(512 * 1024 * 1024)
    .withRemotePath("/")
    .withFileType(BulkFileType.PARQUET)
    .build();

RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);

RemoteBulkWriter を作成するためのパラメータは、connect_param を除き、LocalBulkWriter のパラメータとほとんど同じです。パラメータ設定の詳細については、SDKリファレンスのRemoteBulkWriterと ConnectParamを参照してください。

RemoteBulkWriter を作成するためのパラメータは、StorageConnectParam を除き、LocalBulkWriter のパラメータとほとんど同じです。パラメータ設定の詳細については、SDK リファレンスの RemoteBulkWriter および StorageConnectParam を参照してください。

書き込みの開始

BulkWriterにはappend_row() ソースデータセットから行を追加するメソッドと、commit() 追加した行をローカルファイルまたはリモートバケットにコミットするメソッドがあります。

appendRow() はソースデータセットから行を追加し、commit() は追加した行をローカルファイルあるいはリモートバケットにコミットする。

デモンストレーションのために、以下のコードではランダムに生成されたデータを追加している。

import random
import string

def generate_random_str(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector": [random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_str(random.randint(1, 20)),
        "scalar_2": random.randint(0, 100)
    })
    
writer.commit()
import com.alibaba.fastjson.JSONObject;

for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

定義されているスキーマは動的フィールドを許可しているので、以下のように、挿入するデータにスキーマで定義されていないフィールドを含めることもできる。

import random
import string

def generate_random_string(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector":[random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_string(),
        "scalar_2": random.randint(0, 100),
        "dynamic_field_1": random.choice([True, False]),
        "dynamic_field_2": random.randint(0, 100)
    })
    
writer.commit()
for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));
    json.put("dynamic_field_1", get_random_boolean());
    json.put("dynamic_field_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

結果の確認

結果を確認するには、ライターのbatch_files プロパティを表示することで、実際の出力パスを取得できます。

結果を確認するには、ライターのgetBatchFiles() メソッドを表示することで、実際の出力パスを取得できます。

print(writer.batch_files)

# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
#  ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();

// 

// Close the BulkWriter
try {
    localBulkWriter.close();
    remoteBulkWriter.close();            
} catch (Exception e) {
    // TODO: handle exception
    e.printStackTrace();
}

BulkWriterはUUIDを生成し、提供された出力ディレクトリにUUIDを使用したサブフォルダを作成し、生成されたすべてのファイルをサブフォルダに配置します。用意されているサンプルデータのダウンロードはこちら

可能なフォルダ構造は以下の通りです:

# JSON
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.json 

# Parquet
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.parquet 

翻訳DeepLogo

フィードバック

このページは役に立ちましたか ?