Membuat Koleksi EksternalCompatible with Milvus 3.0.x
Koleksi eksternal adalah jenis koleksi data di Milvus yang mengakses data dari sistem penyimpanan eksternal atau tabel basis data seperti AWS S3 dan Iceberg tanpa menyalinnya ke dalam Milvus. Koleksi ini bertindak sebagai lapisan kueri di atas danau data dengan tetap menjaga kompatibilitas dengan antarmuka kueri Milvus.
Gambaran Umum
Dalam pipeline data AI pada umumnya, pengguna mungkin sudah menyimpan data mereka dalam format Parquet atau format lain di sistem penyimpanan mereka, seperti AWS S3. Untuk membuat Milvus mengonsumsi data yang disimpan secara eksternal ini, pengguna biasanya perlu mengimpornya ke dalam penyimpanan Milvus sendiri menggunakan pipeline Extract-Transform-Load (ETL).
Alur kerja membawa data Anda ke Milvus ini menciptakan data yang berlebihan yang sulit untuk disinkronkan dan menambah beban pemeliharaan teknik untuk memastikan konsistensi data.
Membawa data untuk menghitung alur kerja
Untuk mengatasi masalah ini, Milvus menyediakan koleksi eksternal yang memungkinkan Anda mengakses data yang tersimpan secara eksternal dari Milvus tanpa perlu mengkhawatirkan sinkronisasi data dan jalur ETL.
Menghadirkan komputasi ke alur kerja data
Setelah dibuat, koleksi eksternal dapat mengakses data Anda secara langsung dan menyimpannya di tempat yang sama dengan tempat Anda menyimpannya. Di latar belakang, Milvus membuat file manifes untuk mencatat pemetaan antara metadata Milvus dan baris dalam file data eksternal. Setelah file manifes siap, Anda dapat membuat indeks di koleksi eksternal seperti yang Anda lakukan pada koleksi terkelola lainnya.
Ketika data Anda berubah, pemicu penyegaran sub-detik secara manual akan memperbarui metadata, sehingga Milvus akan selalu diperbarui.
Langkah 1: Membuat skema
Seperti halnya membuat koleksi terkelola, Anda juga perlu membuat skema sebelum membuat koleksi eksternal. Namun, skemanya sedikit berbeda dengan koleksi terkelola.
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
Untuk membuat skema untuk koleksi eksternal, Anda perlu menentukan URI data sumber, format data, dan pengaturan autentikasi.
Nama Parameter |
Deskripsi Parameter |
Nilai Contoh |
|---|---|---|
|
Format berkas data sumber target. |
|
|
ID snapshot tabel Iceberg yang valid. Parameter ini hanya berlaku jika Anda mengatur |
|
|
Pengaturan sistem file eksternal dalam struktur JSON yang diurutkan. |
-- |
Anda memiliki opsi berikut untuk mengatur pengaturan autentikasi:
Gunakan AWS AK/SK
Opsi ini berlaku untuk MinIO yang dihosting sendiri atau skenario di mana Anda memiliki AK/SK untuk bekerja.
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
Nama Parameter |
Deskripsi Parameter |
Nilai Contoh |
|---|---|---|
|
ID kunci akses |
|
|
Nilai kunci akses |
|
|
ID wilayah cloud |
|
|
ID penyedia cloud |
|
|
Apakah SSL digunakan untuk membuat koneksi. |
|
|
Apakah akan menggunakan hosting virtual untuk akses ke bucket Anda. Untuk detailnya, lihat artikel ini. |
|
Gunakan AWS IAM
Opsi ini berlaku untuk skenario di mana Milvus berjalan di instance EC2 atau cluster EKS. Dalam kasus ini, Anda tidak perlu meng-hardcode AK/SK.
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
Nama Parameter |
Deskripsi Parameter |
Contoh Nilai |
|---|---|---|
|
Apakah akan menggunakan AWS IAM. Setel ini ke |
|
|
Titik akhir AWS STS yang valid. Untuk detailnya, lihat artikel ini. |
|
|
ID wilayah cloud |
|
|
ID penyedia cloud |
|
|
Apakah SSL digunakan untuk membuat koneksi. |
|
Gunakan kredensial global Milvus
Opsi ini berlaku ketika Anda menyimpan data eksternal di bucket Milvus, dan pengaturan MinIO global yang ditentukan di milvus.yaml dapat digunakan secara langsung untuk mengakses data.
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
Gunakan ARN Peran IAM
Opsi ini berlaku ketika organisasi Anda menggunakan akun AWS yang berbeda untuk mengelola cluster Milvus dan bucket yang menyimpan file data target.
Dalam kasus ini, pemilik bucket harus membuat peran IAM yang
Melampirkan
AmazonS3FullAccessatau kebijakan yang lebih terperinci untuk akses bucket.Menyertakan
sts:ExternalIdyang ditentukan sendiri di bidang Kondisi pada Kebijakan Kepercayaan peran.
Kemudian, pemilik bucket harus memberi Anda ARN peran IAM dan ID Eksternal sehingga Anda dapat memanggil sts:AssumeRole dengan nilai tersebut untuk mengasumsikan Peran IAM.
Berikut ini adalah contoh kebijakan izin yang akan dilampirkan ke peran IAM dengan izin yang diizinkan. Anda dapat menyesuaikan ini untuk memenuhi kebutuhan Anda.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
Dan kebijakan kepercayaan yang terkait dengan peran IAM mendefinisikan siapa yang diizinkan untuk mengasumsikannya.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
Setelah Anda mendapatkan ARN Peran IAM dan ID Eksternal, Anda dapat mengatur parameter external_spec sebagai berikut:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
Nama Parameter |
Deskripsi Parameter |
Contoh Nilai |
|---|---|---|
|
ID penyedia cloud |
|
|
ID wilayah cloud |
|
|
Apakah SSL digunakan untuk membuat koneksi. |
|
|
Apakah akan menggunakan AWS IAM. Setel ini ke |
|
|
IAM Role ARN yang diperoleh dari pemilik bucket. |
|
|
ID eksternal yang diperoleh dari pemilik bucket. |
-- |
|
Interval di mana Milvus mengambil kredensial autentikasi sementara dalam hitungan detik. |
|
Langkah 2: Menambahkan bidang
Setelah skema siap, Anda dapat menambahkan bidang sebagai berikut:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
Langkah 3: Buat koleksi
Setelah menambahkan semua bidang ke skema, Anda dapat membuat koleksi eksternal.
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
Langkah 4: Membuat indeks
Anda dapat membuat indeks untuk kolom koleksi eksternal seperti yang Anda lakukan di koleksi terkelola.
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
Langkah 5: Menyegarkan data
Setelah koleksi siap, segarkan kembali untuk membuat metadata dan indeks untuk data Anda.
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
Operasi penyegaran bersifat asinkron, jadi Anda perlu menyiapkan iterasi untuk memantau kemajuannya.
Operasi penyegaran memindai metadata berkas data dan menghasilkan berkas manifes yang sesuai. Biasanya membutuhkan waktu 150-250 ms.
File manifes mencatat pemetaan antara metadata di Milvus dan baris-baris di file eksternal.
Jika ada pembaruan pada data sumber Anda, Anda perlu melakukan refresh secara manual agar Milvus tetap mutakhir.
Refresh yang mengharuskan menghapus semua metadata aktif tanpa penyisipan akan mengakibatkan penolakan.
Tindak lanjut
Setelah Anda menyegarkan koleksi eksternal, Anda dapat memuat dan melepaskan koleksi serta melakukan pencarian dan kueri kemiripan di koleksi eksternal seperti yang Anda lakukan di koleksi terkelola mana pun, kecuali bahwa koleksi di basis data untuk komputasi berdasarkan permintaan harus dilampirkan ke kluster berdasarkan permintaan untuk pencarian dan kueri.
Sebelum melakukan operasi DQL, seperti pencarian, kueri, dapatkan, dan pencarian hybrid, Anda perlu membuat sesi untuk melampirkan sumber daya komputasi dari klaster sesuai permintaan.