外部コレクションの作成Compatible with Milvus 3.0.x
外部コレクションはMilvusのデータコレクションの一種で、Milvusにデータをコピーすることなく、AWS S3やIcebergなどの外部ストレージシステムやデータベーステーブルからデータにアクセスします。Milvusのクエリインタフェースとの互換性を保ちながら、データレイクのクエリレイヤーとして機能します。
概要
一般的なAIデータパイプラインでは、AWS S3などのストレージシステム上にParquetなどの形式でデータを保存している場合があります。このような外部保存データをMilvusで利用するためには、通常、ETL(Extract-Transform-Load)パイプラインを使用してMilvusのストレージにインポートする必要があります。
このBring-your-data-to-Milvusワークフローは、同期が難しい冗長なデータを作成し、データの一貫性を確保するためのエンジニアリングメンテナンスの負担を増やします。
データから計算ワークフローへ
このような問題を解決するために、Milvusは外部コレクションを提供し、データの同期やETLパイプラインを気にすることなく、Milvusから外部保存データにアクセスできるようにします。
データワークフローに計算をもたらす
一度作成された外部コレクションは、データに直接アクセスすることができ、データを保存している場所と同じ場所に保管することができます。バックグラウンドでMilvusはマニフェストファイルを作成し、Milvusメタデータと外部データファイルの行のマッピングを記録します。マニフェストファイルの準備ができたら、他の管理コレクションと同様に外部コレクションにインデックスを作成できます。
データが変更されると、手動で秒以下の更新をトリガすることでメタデータが更新され、Milvusは常に最新の状態に保たれます。
ステップ1: スキーマの作成
マネージド・コレクションの作成と同様に、外部コレクションを作成する前にスキーマを作成する必要があります。ただし、スキーマはマネージド・コレクションとは若干異なります。
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
外部コレクションのスキーマを作成するには、ソースデータURI、データ形式、認証設定を指定する必要があります。
パラメータ名 |
パラメータの説明 |
例 値 |
|---|---|---|
|
ターゲットソースデータファイルの形式。 |
|
|
有効な Iceberg テーブルスナップショット ID。このパラメータは、 |
|
|
文字列化されたJSON構造の外部ファイルシステム設定。 |
-- |
認証設定のオプションは以下のとおりです:
AWS AK/SK を使用する。
このオプションは、セルフホスト型のMinIO、または業務でAK/SKを使用するシナリオに適用されます。
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
パラメータ名 |
パラメータの説明 |
例 値 |
|---|---|---|
|
アクセスキーID |
|
|
アクセスキー値 |
|
|
クラウド領域ID |
|
|
クラウド・プロバイダーID |
|
|
接続の確立にSSLを使用するかどうか |
|
|
バケットへのアクセスにバーチャルホスティングを利用するかどうか。 詳細はこちらの記事を参照。 |
|
AWS IAMを使用する
このオプションは、MilvusがEC2インスタンスまたはEKSクラスタ上で動作する場合に適用されます。この場合、AK/SKをハードコーディングする必要はありません。
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
パラメータ名 |
パラメータ説明 |
値の例 |
|---|---|---|
|
AWS IAMを使用するかどうか。 このオプションには |
|
|
有効な AWS STS エンドポイント。 詳細はこちらの記事を参照。 |
|
|
クラウドリージョンID |
|
|
クラウドプロバイダーID |
|
|
接続の確立にSSLを使用するかどうか。 |
|
Milvusグローバル認証情報を使用する
Milvusバケットに外部データを保存し、milvus.yaml で指定したグローバルMinIO設定を直接使用してデータにアクセスできる場合に適用します。
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
IAMロールARNを使用する
このオプションは、組織がMilvusクラスタとターゲットデータファイルを保持するバケットを管理するために異なるAWSアカウントを使用する場合に適用されます。
この場合、バケットオーナーは以下のような IAM ロールを作成する必要があります。
バケットへのアクセスに
AmazonS3FullAccessまたはよりきめ細かいポリシーをアタッチする。ロールの Trust Policy の Condition フィールドに、自分で定義した
sts:ExternalIdを含めます。
そして、Bucket の所有者は IAM ロールの ARN と外部 ID をあなたに提供しなければなりません。そうすれば、あなたはそれらの値を使ってsts:AssumeRole を呼び出し、IAM ロールを引き受けることができます。
以下は、IAMロールに付与する許可ポリシーの例です。要件に合わせてこれを調整できる。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
そして、IAMロールに関連付けられた信頼ポリシーは、誰がIAMロールを引き受けることを許可されるかを定義します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
IAMロールのARNと外部IDを取得したら、external_spec パラメータを以下のように設定します:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
パラメータ名 |
パラメーターの説明 |
値の例 |
|---|---|---|
|
クラウド・プロバイダー ID |
|
|
クラウド・リージョン ID |
|
|
接続の確立にSSLを使用するかどうか |
|
|
AWS IAM を使用するかどうか。 このオプションには |
|
|
バケットオーナーから取得した IAM Role ARN。 |
|
|
バケットオーナーから取得した外部ID。 |
-- |
|
Milvusが一時的な認証情報を取得する間隔(秒)。 |
|
ステップ 2: フィールドの追加
スキーマの準備ができたら、以下のようにフィールドを追加していく:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
ステップ3:コレクションの作成
スキーマにすべてのフィールドを追加したら、外部コレクションを作成します。
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
ステップ 4: インデックスの作成
マネージド・コレクションで行うように、外部コレクションのカラムにインデックスを作成できます。
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
ステップ 5: データのリフレッシュ
コレクションの準備ができたら、リフレッシュしてデータのメタデータとインデックスを作成します。
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
リフレッシュ操作は非同期なので、イテレーションを設定してその進捗を監視する必要がある。
リフレッシュ操作はデータファイルのメタデータをスキャンし、それに応じてマニフェストファイルを生成します。通常150~250ミリ秒かかります。
マニフェストファイルはMilvusのメタデータと外部ファイルの行のマッピングを記録します。
ソース データに更新があった場合、Milvus を最新の状態に保つために手動で再度 refresh を呼び出す必要があります。
アクティブなメタデータをすべて削除し、挿入を伴わないリフレッシュは拒否されます。
フォローアップ
外部コレクションのリフレッシュが完了したら、他の管理対象コレクションと同様に、外部コレクションでコレクションのロードと解放、類似検索とクエリを実行できます。
検索、クエリ、取得、ハイブリッド検索などのDQL操作を行う前に、セッションを作成してオンデマンド・クラスターの計算リソースをアタッチする必要があります。