외부 컬렉션 만들기Compatible with Milvus 3.0.x

외부 컬렉션은 Milvus에 데이터를 복사하지 않고 외부 스토리지 시스템이나 AWS S3, Iceberg와 같은 데이터베이스 테이블의 데이터에 액세스하는 Milvus의 데이터 컬렉션 유형입니다. Milvus 쿼리 인터페이스와의 호환성을 유지하면서 데이터 레이크에 대한 쿼리 레이어 역할을 합니다.

개요

일반적인 AI 데이터 파이프라인에서, 사용자는 이미 AWS S3와 같은 스토리지 시스템에 Parquet 또는 다른 형식으로 데이터를 저장했을 수 있습니다. Milvus가 외부에 저장된 데이터를 사용하려면 일반적으로 사용자는 추출-변환-로드(ETL) 파이프라인을 사용해 데이터를 Milvus의 자체 스토리지로 가져와야 합니다.

이러한 데이터 가져오기 워크플로우는 동기화하기 어려운 중복 데이터를 생성하고 데이터 일관성을 보장하기 위한 엔지니어링 유지 관리 부담을 가중시킵니다.

Bring data to compute workflow 데이터 가져오기를 통한 워크플로우 계산

이러한 문제를 해결하기 위해 Milvus는 데이터 동기화 및 ETL 파이프라인에 대한 걱정 없이 Milvus에서 외부에 저장된 데이터에 액세스할 수 있는 외부 컬렉션을 제공합니다.

Bring compute to data workflow 데이터 워크플로우에 컴퓨팅 도입

외부 컬렉션이 생성되면 데이터에 직접 액세스하여 데이터를 저장하는 동일한 위치에 보관할 수 있습니다. 밀버스는 백그라운드에서 매니페스트 파일을 생성하여 밀버스 메타데이터와 외부 데이터 파일의 행 간의 매핑을 기록합니다. 매니페스트 파일이 준비되면 관리되는 컬렉션에서와 마찬가지로 외부 컬렉션에 인덱스를 만들 수 있습니다.

데이터가 변경되면 1초 미만의 새로 고침을 수동으로 트리거하면 메타데이터가 업데이트되어 Milvus가 항상 최신 상태로 유지됩니다.

1단계: 스키마 만들기

관리 컬렉션을 만들 때와 마찬가지로 외부 컬렉션을 만들기 전에 스키마도 만들어야 합니다. 그러나 스키마는 관리되는 컬렉션의 스키마와 약간 다릅니다.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

외부 컬렉션의 스키마를 만들려면 소스 데이터 URI, 데이터 형식 및 인증 설정을 지정해야 합니다.

매개변수 이름

매개변수 설명

예제 값

format

대상 소스 데이터 파일의 형식입니다.

parquet

snapshot_id

유효한 Iceberg 테이블 스냅샷 ID입니다. 이 매개변수는 formaticeberg_table 으로 설정한 경우에만 적용됩니다.

473984310232959286

extfs

문자열화된 JSON 구조의 외부 파일 시스템 설정.

--

인증 설정을 설정하는 옵션은 다음과 같습니다:

AWS AK/SK 사용

이 옵션은 자체 호스팅 MinIO 또는 업무용 AK/SK가 있는 시나리오에 적용됩니다.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

매개변수 이름

매개변수 설명

예제 값

extfs.access_key_id

액세스 키 ID

AKIA...

extfs.access_key_value

액세스 키 값

u7LH...

extfs.region

클라우드 지역 ID

us-west-2

extfs.cloud_provider

클라우드 공급자 ID

aws

extfs.use_ssl

SSL을 사용하여 연결을 설정할지 여부입니다.

true

extfs.use_virtual_host

버킷 액세스를 위해 가상 호스팅을 사용할지 여부입니다.

자세한 내용은 이 문서를 참조하세요.

true

AWS IAM 사용

이 옵션은 Milvus가 EC2 인스턴스 또는 EKS 클러스터에서 실행되는 시나리오에 적용됩니다. 이 경우 AK/SK를 하드코딩할 필요가 없습니다.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

파라미터 이름

파라미터 설명

예제 값

extfs.use_iam

AWS IAM 사용 여부.

이 옵션의 경우 "true" 로 설정합니다.

true

extfs.iam_endpoint

유효한 AWS STS 엔드포인트입니다.

자세한 내용은 이 문서를 참조하세요.

https:*//*sts.<region>.amazonaws.com

extfs.region

클라우드 리전 ID

us-west-2

extfs.cloud_provider

클라우드 공급자 ID

aws

extfs.use_ssl

연결 설정에 SSL을 사용할지 여부입니다.

true

Milvus 글로벌 자격 증명 사용

이 옵션은 Milvus 버킷에 외부 데이터를 저장할 때 적용되며, milvus.yaml 에 지정된 글로벌 MinIO 설정을 사용하여 데이터에 직접 액세스할 수 있습니다.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

IAM 역할 ARN 사용

이 옵션은 조직에서 서로 다른 AWS 계정을 사용하여 Milvus 클러스터와 대상 데이터 파일을 보관하는 버킷을 관리하는 경우에 적용됩니다.

이 경우 버킷 소유자는 다음과 같은 IAM 역할을 만들어야 합니다.

  • AmazonS3FullAccess 또는 버킷 액세스에 대한 보다 세분화된 정책을 첨부합니다.

  • 역할의 신뢰 정책의 조건 필드에 자체 정의된 sts:ExternalId 을 포함합니다.

그런 다음 버킷 소유자가 IAM 역할 ARN과 외부 ID를 제공해야 해당 값으로 sts:AssumeRole 을 호출하여 IAM 역할을 맡을 수 있습니다.

다음은 허용된 권한과 함께 IAM 역할에 첨부할 권한 정책의 예시입니다. 요구 사항에 맞게 이를 조정할 수 있습니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

그리고 IAM 역할과 연결된 신뢰 정책은 해당 역할을 맡을 수 있는 사람을 정의합니다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

IAM 역할 ARN과 외부 ID를 얻은 후에는 다음과 같이 external_spec 매개 변수를 설정할 수 있습니다:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

매개 변수 이름

매개 변수 설명

예제 값

extfs.cloud_provider

클라우드 공급자 ID

aws

extfs.region

클라우드 리전 ID

us-west-2

extfs.use_ssl

SSL을 사용하여 연결을 설정할지 여부입니다.

true

extfs.use_iam

AWS IAM 사용 여부.

이 옵션의 경우 "true" 로 설정합니다.

true

extfs.role_arn

버킷 소유자로부터 얻은 IAM 역할 ARN입니다.

arn:aws:iam::306787000000:role/...

extfs.external_id

버킷 소유자로부터 얻은 외부 ID.

--

extfs.load_frequency

Milvus가 임시 인증 자격 증명을 검색하는 간격(초)입니다.

900

2단계: 필드 추가

스키마가 준비되면 다음과 같이 필드를 추가할 수 있습니다:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

3단계: 컬렉션 만들기

스키마에 모든 필드를 추가한 후 외부 컬렉션을 만들 수 있습니다.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

4단계: 인덱스 생성

관리되는 컬렉션에서와 마찬가지로 외부 컬렉션 열에 대한 인덱스를 만들 수 있습니다.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

5단계: 데이터 새로 고침

컬렉션이 준비되면 컬렉션을 새로 고쳐 데이터에 대한 메타데이터와 인덱스를 만듭니다.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

새로 고침 작업은 비동기적이므로 진행 상황을 모니터링하기 위해 반복을 설정해야 합니다.

  • 새로 고침 작업은 데이터 파일의 메타데이터를 스캔하고 그에 따라 매니페스트 파일을 생성합니다. 일반적으로 150-250ms가 소요됩니다.

  • 매니페스트 파일은 Milvus의 메타데이터와 외부 파일의 행 간의 매핑을 기록합니다.

  • 소스 데이터에 업데이트가 있는 경우 수동으로 새로 고침을 다시 호출하여 Milvus를 최신 상태로 유지해야 합니다.

  • 삽입 없이 모든 활성 메타데이터를 제거해야 하는 새로 고침은 거부됩니다.

후속 조치

외부 컬렉션을 새로 고친 후에는 관리되는 컬렉션에서와 마찬가지로 컬렉션을 로드 및 해제하고 외부 컬렉션에서 유사성 검색 및 쿼리를 수행할 수 있지만, 온디맨드 컴퓨팅용 데이터베이스의 컬렉션은 검색 및 쿼리를 위해 온디맨드 클러스터에 연결해야 한다는 점을 제외하고는 그렇지 않습니다.

검색, 쿼리, 가져오기 및 하이브리드 검색과 같은 DQL 작업을 수행하기 전에 먼저 세션을 만들어 온디맨드 클러스터의 컴퓨팅 리소스를 연결해야 합니다.