创建外部 CollectionsCompatible with Milvus 3.0.x

外部 Collections 是 Milvus 中的一种数据收集类型,可访问外部存储系统或数据库表(如 AWS S3 和 Iceberg)中的数据,而无需将其复制到 Milvus 中。它充当数据湖的查询层,同时保持与 Milvus 查询接口的兼容性。

概述

在典型的人工智能数据管道中,用户可能已经在 AWS S3 等存储系统上以 Parquet 或其他格式存储了数据。要让 Milvus 使用这些外部存储的数据,用户通常需要使用提取-转换-加载(ETL)管道将其导入 Milvus 自己的存储系统。

这种将数据导入 Milvus 的工作流程会产生难以同步的冗余数据,并增加确保数据一致性的工程维护负担。

Bring data to compute workflow 将数据引入计算工作流程

为了解决这些问题,Milvus 提供外部 Collections,让您从 Milvus 访问外部存储的数据,而无需担心数据同步和 ETL 管道。

Bring compute to data workflow 将计算带入数据工作流

外部 Collections 创建后,可直接访问您的数据,并将其保存在您存储数据的相同位置。在后台,Milvus 会创建清单文件,记录 Milvus 元数据与外部数据文件中的行之间的映射关系。清单文件准备就绪后,你可以像在任何管理 Collections 中一样,在外部 Collections 中创建索引。

当数据发生变化时,手动触发次秒级刷新即可更新元数据,使 Milvus 始终保持最新状态。

第 1 步:创建 Schema

与创建管理 Collections 一样,在创建外部 Collections 之前也需要创建模式。不过,模式与管理 Collections 略有不同。

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

要为外部 Collections 创建模式,需要指定源数据 URI、数据格式和身份验证设置。

参数名称

参数描述

示例值

format

目标源数据文件的格式。

parquet

snapshot_id

有效的 Iceberg 表快照 ID。只有将format 设置为iceberg_table 时,此参数才适用。

473984310232959286

extfs

以字符串化 JSON 结构表示的外部文件系统设置。

--

您可以使用以下选项设置身份验证设置:

使用 AWS AK/SK

此选项适用于自托管 MinIO 或使用 AK/SK 工作的情况。

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

参数名称

参数描述

示例 值

extfs.access_key_id

访问密钥 ID

AKIA...

extfs.access_key_value

访问密钥值

u7LH...

extfs.region

云区域 ID

us-west-2

extfs.cloud_provider

云提供商 ID

aws

extfs.use_ssl

是否使用 SSL 建立连接。

true

extfs.use_virtual_host

是否使用虚拟主机访问您的存储桶。

有关详细信息,请参阅本文

true

使用 AWS IAM

该选项适用于 Milvus 在 EC2 实例或 EKS 集群上运行的情况。在这种情况下,无需对 AK/SK 进行硬编码。

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

参数名称

参数描述

示例 值

extfs.use_iam

是否使用 AWS IAM。

为此选项将其设置为"true"

true

extfs.iam_endpoint

有效的 AWS STS 端点。

有关详细信息,请参阅本文

https:*//*sts.<region>.amazonaws.com

extfs.region

云区域 ID

us-west-2

extfs.cloud_provider

云提供商 ID

aws

extfs.use_ssl

是否使用 SSL 建立连接。

true

使用 Milvus 全局凭据

此选项适用于在 Milvus 存储桶中存储外部数据的情况,在milvus.yaml 中指定的全局 MinIO 设置可直接用于访问数据。

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

使用 IAM 角色 ARN

当你的组织使用不同的 AWS 账户来管理 Milvus 集群和保存目标数据文件的存储桶时,适用此选项。

在这种情况下,数据桶所有者应创建一个 IAM 角色,该角色应

  • 为桶访问附加AmazonS3FullAccess 或更精细的策略。

  • 在角色的信任策略的条件字段中包含一个自定义的sts:ExternalId

然后,水桶所有者应向您提供 IAM 角色的 ARN 和外部 ID,这样您就可以使用这些值调用sts:AssumeRole 来承担 IAM 角色。

下面是一个权限策略示例,可附加到 IAM 角色并提供允许的权限。你可以根据自己的需要进行调整。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

与 IAM 角色相关的信任策略定义了谁可以担任该角色。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

获得 IAM 角色 ARN 和外部 ID 后,就可以设置external_spec 参数,如下所示:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

参数名称

参数描述

示例值

extfs.cloud_provider

云提供商 ID

aws

extfs.region

云区域 ID

us-west-2

extfs.use_ssl

是否使用 SSL 建立连接。

true

extfs.use_iam

是否使用 AWS IAM。

将此设置为"true"

true

extfs.role_arn

从数据桶所有者处获得的 IAM 角色 ARN。

arn:aws:iam::306787000000:role/...

extfs.external_id

从数据桶所有者处获得的外部 ID。

--

extfs.load_frequency

Milvus 检索临时身份验证凭据的间隔(秒)。

900

第 2 步:添加字段

Schema 准备就绪后,就可以按如下步骤添加字段:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

第 3 步:创建 Collections

将所有字段添加到 Schema 后,就可以创建外部 Collections 了。

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

第 4 步:创建索引

您可以像在管理集合中一样为外部集合列创建索引。

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

第 5 步:刷新数据

集合准备就绪后,刷新集合,为数据创建元数据和索引。

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

刷新操作是异步的,因此需要设置一个迭代来监控其进度。

  • 刷新操作会扫描数据文件的元数据并生成相应的清单文件。通常需要 150-250 毫秒。

  • 清单文件记录了 Milvus 中的元数据与外部文件中的行之间的映射。

  • 如果源数据有更新,就需要再次手动调用刷新,使 Milvus 保持最新。

  • 如果刷新需要删除所有活动元数据,而不插入任何内容,则会导致拒绝。

后续操作

刷新外部集合后,您可以加载和释放集合,并像在任何管理集合中一样在外部集合中执行相似性搜索和查询,但用于按需计算的数据库中的集合必须附加到按需集群上才能进行搜索和查询。

在进行搜索、查询、获取和混合搜索等 DQL 操作前,需要创建会话以附加按需群集的计算资源。

想要更快、更简单、更好用的 Milvus SaaS服务 ?

Zilliz Cloud是基于Milvus的全托管向量数据库,拥有更高性能,更易扩展,以及卓越性价比

免费试用 Zilliz Cloud
反馈

此页对您是否有帮助?