Membuat Koleksi EksternalCompatible with Milvus 3.0.x

Koleksi eksternal adalah jenis koleksi data di Milvus yang mengakses data dari sistem penyimpanan eksternal atau tabel basis data seperti AWS S3 dan Iceberg tanpa menyalinnya ke dalam Milvus. Koleksi ini bertindak sebagai lapisan kueri di atas danau data dengan tetap menjaga kompatibilitas dengan antarmuka kueri Milvus.

Gambaran Umum

Dalam pipeline data AI pada umumnya, pengguna mungkin sudah menyimpan data mereka dalam format Parquet atau format lain di sistem penyimpanan mereka, seperti AWS S3. Untuk membuat Milvus mengonsumsi data yang disimpan secara eksternal ini, pengguna biasanya perlu mengimpornya ke dalam penyimpanan Milvus sendiri menggunakan pipeline Extract-Transform-Load (ETL).

Alur kerja membawa data Anda ke Milvus ini menciptakan data yang berlebihan yang sulit untuk disinkronkan dan menambah beban pemeliharaan teknik untuk memastikan konsistensi data.

Bring data to compute workflow Membawa data untuk menghitung alur kerja

Untuk mengatasi masalah ini, Milvus menyediakan koleksi eksternal yang memungkinkan Anda mengakses data yang tersimpan secara eksternal dari Milvus tanpa perlu mengkhawatirkan sinkronisasi data dan jalur ETL.

Bring compute to data workflow Menghadirkan komputasi ke alur kerja data

Setelah dibuat, koleksi eksternal dapat mengakses data Anda secara langsung dan menyimpannya di tempat yang sama dengan tempat Anda menyimpannya. Di latar belakang, Milvus membuat file manifes untuk mencatat pemetaan antara metadata Milvus dan baris dalam file data eksternal. Setelah file manifes siap, Anda dapat membuat indeks di koleksi eksternal seperti yang Anda lakukan pada koleksi terkelola lainnya.

Ketika data Anda berubah, pemicu penyegaran sub-detik secara manual akan memperbarui metadata, sehingga Milvus akan selalu diperbarui.

Langkah 1: Membuat skema

Seperti halnya membuat koleksi terkelola, Anda juga perlu membuat skema sebelum membuat koleksi eksternal. Namun, skemanya sedikit berbeda dengan koleksi terkelola.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Untuk membuat skema untuk koleksi eksternal, Anda perlu menentukan URI data sumber, format data, dan pengaturan autentikasi.

Nama Parameter

Deskripsi Parameter

Nilai Contoh

format

Format berkas data sumber target.

parquet

snapshot_id

ID snapshot tabel Iceberg yang valid. Parameter ini hanya berlaku jika Anda mengatur format ke iceberg_table.

473984310232959286

extfs

Pengaturan sistem file eksternal dalam struktur JSON yang diurutkan.

--

Anda memiliki opsi berikut untuk mengatur pengaturan autentikasi:

Gunakan AWS AK/SK

Opsi ini berlaku untuk MinIO yang dihosting sendiri atau skenario di mana Anda memiliki AK/SK untuk bekerja.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Nama Parameter

Deskripsi Parameter

Nilai Contoh

extfs.access_key_id

ID kunci akses

AKIA...

extfs.access_key_value

Nilai kunci akses

u7LH...

extfs.region

ID wilayah cloud

us-west-2

extfs.cloud_provider

ID penyedia cloud

aws

extfs.use_ssl

Apakah SSL digunakan untuk membuat koneksi.

true

extfs.use_virtual_host

Apakah akan menggunakan hosting virtual untuk akses ke bucket Anda.

Untuk detailnya, lihat artikel ini.

true

Gunakan AWS IAM

Opsi ini berlaku untuk skenario di mana Milvus berjalan di instance EC2 atau cluster EKS. Dalam kasus ini, Anda tidak perlu meng-hardcode AK/SK.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Nama Parameter

Deskripsi Parameter

Contoh Nilai

extfs.use_iam

Apakah akan menggunakan AWS IAM.

Setel ini ke "true" untuk opsi ini.

true

extfs.iam_endpoint

Titik akhir AWS STS yang valid.

Untuk detailnya, lihat artikel ini.

https:*//*sts.<region>.amazonaws.com

extfs.region

ID wilayah cloud

us-west-2

extfs.cloud_provider

ID penyedia cloud

aws

extfs.use_ssl

Apakah SSL digunakan untuk membuat koneksi.

true

Gunakan kredensial global Milvus

Opsi ini berlaku ketika Anda menyimpan data eksternal di bucket Milvus, dan pengaturan MinIO global yang ditentukan di milvus.yaml dapat digunakan secara langsung untuk mengakses data.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

Gunakan ARN Peran IAM

Opsi ini berlaku ketika organisasi Anda menggunakan akun AWS yang berbeda untuk mengelola cluster Milvus dan bucket yang menyimpan file data target.

Dalam kasus ini, pemilik bucket harus membuat peran IAM yang

  • Melampirkan AmazonS3FullAccess atau kebijakan yang lebih terperinci untuk akses bucket.

  • Menyertakan sts:ExternalId yang ditentukan sendiri di bidang Kondisi pada Kebijakan Kepercayaan peran.

Kemudian, pemilik bucket harus memberi Anda ARN peran IAM dan ID Eksternal sehingga Anda dapat memanggil sts:AssumeRole dengan nilai tersebut untuk mengasumsikan Peran IAM.

Berikut ini adalah contoh kebijakan izin yang akan dilampirkan ke peran IAM dengan izin yang diizinkan. Anda dapat menyesuaikan ini untuk memenuhi kebutuhan Anda.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

Dan kebijakan kepercayaan yang terkait dengan peran IAM mendefinisikan siapa yang diizinkan untuk mengasumsikannya.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Setelah Anda mendapatkan ARN Peran IAM dan ID Eksternal, Anda dapat mengatur parameter external_spec sebagai berikut:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Nama Parameter

Deskripsi Parameter

Contoh Nilai

extfs.cloud_provider

ID penyedia cloud

aws

extfs.region

ID wilayah cloud

us-west-2

extfs.use_ssl

Apakah SSL digunakan untuk membuat koneksi.

true

extfs.use_iam

Apakah akan menggunakan AWS IAM.

Setel ini ke "true" untuk opsi ini.

true

extfs.role_arn

IAM Role ARN yang diperoleh dari pemilik bucket.

arn:aws:iam::306787000000:role/...

extfs.external_id

ID eksternal yang diperoleh dari pemilik bucket.

--

extfs.load_frequency

Interval di mana Milvus mengambil kredensial autentikasi sementara dalam hitungan detik.

900

Langkah 2: Menambahkan bidang

Setelah skema siap, Anda dapat menambahkan bidang sebagai berikut:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Langkah 3: Buat koleksi

Setelah menambahkan semua bidang ke skema, Anda dapat membuat koleksi eksternal.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Langkah 4: Membuat indeks

Anda dapat membuat indeks untuk kolom koleksi eksternal seperti yang Anda lakukan di koleksi terkelola.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Langkah 5: Menyegarkan data

Setelah koleksi siap, segarkan kembali untuk membuat metadata dan indeks untuk data Anda.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

Operasi penyegaran bersifat asinkron, jadi Anda perlu menyiapkan iterasi untuk memantau kemajuannya.

  • Operasi penyegaran memindai metadata berkas data dan menghasilkan berkas manifes yang sesuai. Biasanya membutuhkan waktu 150-250 ms.

  • File manifes mencatat pemetaan antara metadata di Milvus dan baris-baris di file eksternal.

  • Jika ada pembaruan pada data sumber Anda, Anda perlu melakukan refresh secara manual agar Milvus tetap mutakhir.

  • Refresh yang mengharuskan menghapus semua metadata aktif tanpa penyisipan akan mengakibatkan penolakan.

Tindak lanjut

Setelah Anda menyegarkan koleksi eksternal, Anda dapat memuat dan melepaskan koleksi serta melakukan pencarian dan kueri kemiripan di koleksi eksternal seperti yang Anda lakukan di koleksi terkelola mana pun, kecuali bahwa koleksi di basis data untuk komputasi berdasarkan permintaan harus dilampirkan ke kluster berdasarkan permintaan untuk pencarian dan kueri.

Sebelum melakukan operasi DQL, seperti pencarian, kueri, dapatkan, dan pencarian hybrid, Anda perlu membuat sesi untuk melampirkan sumber daya komputasi dari klaster sesuai permintaan.