建立外部資料集Compatible with Milvus 3.0.x

外部集合是 Milvus 中的一種資料集合類型,可從 AWS S3 和 Iceberg 等外部儲存系統或資料庫表存取資料,而無需將資料複製到 Milvus 中。它充當資料湖上的查詢層,同時保持與 Milvus 查詢介面的相容性。

概述

在典型的人工智能資料管道中,使用者可能已將資料以 Parquet 或其他格式儲存在他們的儲存系統中,例如 AWS S3。為了讓 Milvus 使用這些外部儲存的資料,使用者通常需要使用抽取-轉換-載入 (ETL) 管道將資料匯入 Milvus 自己的儲存系統。

這種將您的資料帶到 Milvus 的工作流程會產生難以同步的冗餘資料,並增加工程維護的負擔,以確保資料的一致性。

Bring data to compute workflow 將資料帶到計算工作流程

為了解決這些問題,Milvus 提供外部集合,讓您從 Milvus 存取外部儲存的資料,而不必擔心資料同步和 ETL 管道。

Bring compute to data workflow 將計算帶入資料工作流程

一旦建立,外部資料集可直接存取您的資料,並將資料保存在您儲存資料的相同位置。在後台,Milvus 會建立艙單檔案,記錄 Milvus 元資料與外部資料檔案中的資料行之間的對應關係。清單檔案就緒後,您就可以在外部資料集中建立索引,就像在任何受管理的資料集中一樣。

當您的資料變更時,手動觸發次秒級刷新即可更新元資料,讓 Milvus 永遠保持最新狀態。

步驟 1:建立模式

與建立管理式資料集一樣,在建立外部資料集之前,您也需要建立模式。然而,模式與管理式集合略有不同。

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

要為外部集合建立模式,您需要指定來源資料 URI、資料格式和驗證設定。

參數名稱

參數說明

範例 值

format

目標來源資料檔案的格式。

parquet

snapshot_id

有效的 Iceberg 表快照 ID。此參數僅在您將format 設為iceberg_table 時適用。

473984310232959286

extfs

以字符串化 JSON 結構的外部檔案系統設定。

--

您有以下選項來設定驗證設定:

使用 AWS AK/SK

此選項適用於自託管 MinIO 或有 AK/SK 工作的情況。

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

參數名稱

參數說明

範例 值

extfs.access_key_id

存取金鑰 ID

AKIA...

extfs.access_key_value

存取權限值

u7LH...

extfs.region

雲區域 ID

us-west-2

extfs.cloud_provider

雲端提供者 ID

aws

extfs.use_ssl

是否使用 SSL 建立連線。

true

extfs.use_virtual_host

是否使用虛擬主機存取您的儲存桶。

如需詳細資訊,請參閱本文

true

使用 AWS IAM

此選項適用於 Milvus 在 EC2 實例或 EKS 集群上執行的情況。在這種情況下,您不需要硬體編碼 AK/SK。

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

參數名稱

參數說明

範例 值

extfs.use_iam

是否使用 AWS IAM。

此選項設定為"true"

true

extfs.iam_endpoint

有效的 AWS STS 端點。

如需詳細資訊,請參閱本文

https:*//*sts.<region>.amazonaws.com

extfs.region

雲區域 ID

us-west-2

extfs.cloud_provider

雲提供商 ID

aws

extfs.use_ssl

是否使用 SSL 建立連線。

true

使用 Milvus 全局憑證

此選項適用於在 Milvus 儲存桶中儲存外部資料時,可直接使用milvus.yaml 中指定的全局 MinIO 設定來存取資料。

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

使用 IAM 角色 ARN

當您的組織使用不同的 AWS 帳戶來管理 Milvus 叢集和存放目標資料檔案的儲存桶時,此選項適用。

在這種情況下,水桶擁有者應該建立一個 IAM 角色,以

  • 附加AmazonS3FullAccess 或針對水桶存取的更精細政策。

  • 在角色的信任政策的 Condition 欄位中包含一個自定義的sts:ExternalId

然後,水桶所有者應該提供您 IAM 角色的 ARN 和 External ID,這樣您就可以使用這些值呼叫sts:AssumeRole ,以承擔 IAM 角色。

以下是要附加到 IAM 角色的允許權限政策範例。您可以調整此政策以符合您的需求。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

而與 IAM 角色相關的信任政策則定義了誰可以擔任該角色。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

取得 IAM 角色 ARN 和外部 ID 後,就可以設定external_spec 參數,如下所示:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

參數名稱

參數說明

示例值

extfs.cloud_provider

雲提供商 ID

aws

extfs.region

雲區域 ID

us-west-2

extfs.use_ssl

是否使用 SSL 建立連線。

true

extfs.use_iam

是否使用 AWS IAM。

此選項設定為"true"

true

extfs.role_arn

從水桶所有者取得的 IAM 角色 ARN。

arn:aws:iam::306787000000:role/...

extfs.external_id

從水桶所有者取得的外部 ID。

--

extfs.load_frequency

Milvus 擷取臨時認證憑證的間隔,以秒為單位。

900

步驟 2:新增欄位

模式準備就緒後,您可以如下方式新增欄位:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

步驟 3:建立集合

將所有欄位加入模式後,您就可以建立外部集合。

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

步驟 4:建立索引

您可以為外部集合欄位建立索引,就像在管理集合中所做的一樣。

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

步驟 5:刷新資料

一旦集合準備就緒,刷新它以為您的資料建立元資料和索引。

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

刷新作業是異步的,因此您需要設定一個迭代來監控其進度。

  • 刷新作業會掃描資料檔案的元資料,並據此產生清單檔案。通常需要 150-250 毫秒。

  • 艙單檔記錄了 Milvus 中的元資料與外部檔案中的行之間的映射。

  • 如果源資料有更新,您需要再次手動呼叫刷新,以保持 Milvus 為最新。

  • 如果刷新需要移除所有活動的元資料,但沒有插入任何元資料,會導致拒絕。

跟進

一旦您刷新了外部資料集,您就可以載入和釋放資料集,並在外部資料集中執行相似性搜尋和查詢,就像在任何受管理的資料集中一樣,除了用於隨選運算的資料庫中的資料集必須附加到隨選叢集以進行搜尋和查詢。

在執行搜尋、查詢、獲取和混合搜尋等 DQL 作業之前,您需要建立一個會話,以附加隨選群集的計算資源。