外部コレクションの作成Compatible with Milvus 3.0.x

外部コレクションはMilvusのデータコレクションの一種で、Milvusにデータをコピーすることなく、AWS S3やIcebergなどの外部ストレージシステムやデータベーステーブルからデータにアクセスします。Milvusのクエリインタフェースとの互換性を保ちながら、データレイクのクエリレイヤーとして機能します。

概要

一般的なAIデータパイプラインでは、AWS S3などのストレージシステム上にParquetなどの形式でデータを保存している場合があります。このような外部保存データをMilvusで利用するためには、通常、ETL(Extract-Transform-Load)パイプラインを使用してMilvusのストレージにインポートする必要があります。

このBring-your-data-to-Milvusワークフローは、同期が難しい冗長なデータを作成し、データの一貫性を確保するためのエンジニアリングメンテナンスの負担を増やします。

Bring data to compute workflow データから計算ワークフローへ

このような問題を解決するために、Milvusは外部コレクションを提供し、データの同期やETLパイプラインを気にすることなく、Milvusから外部保存データにアクセスできるようにします。

Bring compute to data workflow データワークフローに計算をもたらす

一度作成された外部コレクションは、データに直接アクセスすることができ、データを保存している場所と同じ場所に保管することができます。バックグラウンドでMilvusはマニフェストファイルを作成し、Milvusメタデータと外部データファイルの行のマッピングを記録します。マニフェストファイルの準備ができたら、他の管理コレクションと同様に外部コレクションにインデックスを作成できます。

データが変更されると、手動で秒以下の更新をトリガすることでメタデータが更新され、Milvusは常に最新の状態に保たれます。

ステップ1: スキーマの作成

マネージド・コレクションの作成と同様に、外部コレクションを作成する前にスキーマを作成する必要があります。ただし、スキーマはマネージド・コレクションとは若干異なります。

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

外部コレクションのスキーマを作成するには、ソースデータURI、データ形式、認証設定を指定する必要があります。

パラメータ名

パラメータの説明

例 値

format

ターゲットソースデータファイルの形式。

parquet

snapshot_id

有効な Iceberg テーブルスナップショット ID。このパラメータは、formaticeberg_table に設定した場合にのみ適用されます。

473984310232959286

extfs

文字列化されたJSON構造の外部ファイルシステム設定。

--

認証設定のオプションは以下のとおりです:

AWS AK/SK を使用する。

このオプションは、セルフホスト型のMinIO、または業務でAK/SKを使用するシナリオに適用されます。

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

パラメータ名

パラメータの説明

例 値

extfs.access_key_id

アクセスキーID

AKIA...

extfs.access_key_value

アクセスキー値

u7LH...

extfs.region

クラウド領域ID

us-west-2

extfs.cloud_provider

クラウド・プロバイダーID

aws

extfs.use_ssl

接続の確立にSSLを使用するかどうか

true

extfs.use_virtual_host

バケットへのアクセスにバーチャルホスティングを利用するかどうか。

詳細はこちらの記事を参照。

true

AWS IAMを使用する

このオプションは、MilvusがEC2インスタンスまたはEKSクラスタ上で動作する場合に適用されます。この場合、AK/SKをハードコーディングする必要はありません。

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

パラメータ名

パラメータ説明

値の例

extfs.use_iam

AWS IAMを使用するかどうか。

このオプションには"true" を設定する。

true

extfs.iam_endpoint

有効な AWS STS エンドポイント。

詳細はこちらの記事を参照。

https:*//*sts.<region>.amazonaws.com

extfs.region

クラウドリージョンID

us-west-2

extfs.cloud_provider

クラウドプロバイダーID

aws

extfs.use_ssl

接続の確立にSSLを使用するかどうか。

true

Milvusグローバル認証情報を使用する

Milvusバケットに外部データを保存し、milvus.yaml で指定したグローバルMinIO設定を直接使用してデータにアクセスできる場合に適用します。

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

IAMロールARNを使用する

このオプションは、組織がMilvusクラスタとターゲットデータファイルを保持するバケットを管理するために異なるAWSアカウントを使用する場合に適用されます。

この場合、バケットオーナーは以下のような IAM ロールを作成する必要があります。

  • バケットへのアクセスにAmazonS3FullAccess またはよりきめ細かいポリシーをアタッチする。

  • ロールの Trust Policy の Condition フィールドに、自分で定義したsts:ExternalId を含めます。

そして、Bucket の所有者は IAM ロールの ARN と外部 ID をあなたに提供しなければなりません。そうすれば、あなたはそれらの値を使ってsts:AssumeRole を呼び出し、IAM ロールを引き受けることができます。

以下は、IAMロールに付与する許可ポリシーの例です。要件に合わせてこれを調整できる。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

そして、IAMロールに関連付けられた信頼ポリシーは、誰がIAMロールを引き受けることを許可されるかを定義します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

IAMロールのARNと外部IDを取得したら、external_spec パラメータを以下のように設定します:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

パラメータ名

パラメーターの説明

値の例

extfs.cloud_provider

クラウド・プロバイダー ID

aws

extfs.region

クラウド・リージョン ID

us-west-2

extfs.use_ssl

接続の確立にSSLを使用するかどうか

true

extfs.use_iam

AWS IAM を使用するかどうか。

このオプションには"true" を設定する。

true

extfs.role_arn

バケットオーナーから取得した IAM Role ARN。

arn:aws:iam::306787000000:role/...

extfs.external_id

バケットオーナーから取得した外部ID。

--

extfs.load_frequency

Milvusが一時的な認証情報を取得する間隔(秒)。

900

ステップ 2: フィールドの追加

スキーマの準備ができたら、以下のようにフィールドを追加していく:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

ステップ3:コレクションの作成

スキーマにすべてのフィールドを追加したら、外部コレクションを作成します。

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

ステップ 4: インデックスの作成

マネージド・コレクションで行うように、外部コレクションのカラムにインデックスを作成できます。

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

ステップ 5: データのリフレッシュ

コレクションの準備ができたら、リフレッシュしてデータのメタデータとインデックスを作成します。

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

リフレッシュ操作は非同期なので、イテレーションを設定してその進捗を監視する必要がある。

  • リフレッシュ操作はデータファイルのメタデータをスキャンし、それに応じてマニフェストファイルを生成します。通常150~250ミリ秒かかります。

  • マニフェストファイルはMilvusのメタデータと外部ファイルの行のマッピングを記録します。

  • ソース データに更新があった場合、Milvus を最新の状態に保つために手動で再度 refresh を呼び出す必要があります。

  • アクティブなメタデータをすべて削除し、挿入を伴わないリフレッシュは拒否されます。

フォローアップ

外部コレクションのリフレッシュが完了したら、他の管理対象コレクションと同様に、外部コレクションでコレクションのロードと解放、類似検索とクエリを実行できます。

検索、クエリ、取得、ハイブリッド検索などのDQL操作を行う前に、セッションを作成してオンデマンド・クラスターの計算リソースをアタッチする必要があります。