إنشاء مجموعة خارجيةCompatible with Milvus 3.0.x
المجموعة الخارجية هي نوع من مجموعات البيانات في Milvus التي تصل إلى البيانات من أنظمة التخزين الخارجية أو جداول قواعد البيانات مثل AWS S3 و Iceberg دون نسخها إلى Milvus. تعمل كطبقة استعلام فوق بحيرات البيانات مع الحفاظ على التوافق مع واجهات استعلام Milvus.
نظرة عامة
في خط أنابيب بيانات الذكاء الاصطناعي النموذجي، قد يكون المستخدمون قد قاموا بالفعل بتخزين بياناتهم في Parquet أو بتنسيقات أخرى على نظام التخزين الخاص بهم، مثل AWS S3. لجعل Milvus يستهلك هذه البيانات المخزنة خارجيًا، يحتاج المستخدمون عادةً إلى استيرادها إلى وحدة تخزين Milvus الخاصة باستخدام خطوط أنابيب الاستخراج والتحويل والتحميل (ETL).
يؤدي سير عمل إحضار البيانات إلى ميلفوس هذا إلى إنشاء بيانات زائدة عن الحاجة يصعب مزامنتها ويضيف إلى عبء الصيانة الهندسية لضمان اتساق البيانات.
إحضار البيانات لحساب سير العمل
لحل هذه المشكلات، توفر Milvus مجموعات خارجية تتيح لك الوصول إلى بياناتك المخزنة خارجيًا من Milvus دون القلق بشأن مزامنة البيانات وخطوط أنابيب ETL.
جلب الحوسبة إلى سير عمل البيانات
بمجرد إنشائها، يمكن للمجموعة الخارجية الوصول إلى بياناتك مباشرةً والاحتفاظ بها في نفس المكان الذي تخزنها فيه. في الخلفية، ينشئ Milvus ملفات البيان لتسجيل التعيينات بين بيانات Milvus الوصفية والصفوف في ملفات البيانات الخارجية. بعد أن تصبح ملفات البيان جاهزة، يمكنك إنشاء فهارس في المجموعة الخارجية كما تفعل في أي مجموعة مُدارة.
عندما تتغير بياناتك، يؤدي تشغيل تحديث البيانات الوصفية يدويًا في الثانية إلى تحديث البيانات الوصفية يدويًا، مما يجعل Milvus محدثًا دائمًا.
الخطوة 1: إنشاء مخطط
كما هو الحال مع إنشاء مجموعة مُدارة، تحتاج أيضًا إلى إنشاء مخطط قبل إنشاء مجموعة خارجية. ومع ذلك، يختلف المخطط قليلاً عن مخطط المجموعة المُدارة.
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
لإنشاء مخطط لمجموعة خارجية، تحتاج إلى تحديد URI للبيانات المصدر وتنسيق البيانات وإعدادات المصادقة.
اسم المعلمة |
وصف المعلمة |
مثال القيمة |
|---|---|---|
|
تنسيق ملفات بيانات المصدر الهدف المصدر. |
|
|
معرف لقطة جدول Iceberg صالح. تنطبق هذه المعلمة فقط عند تعيين |
|
|
إعدادات نظام الملفات الخارجية في بنية JSON متسلسلة. |
-- |
لديك الخيارات التالية لتعيين إعدادات المصادقة:
استخدام AWS AK/SK
ينطبق هذا الخيار على MinIO المستضاف ذاتيًا أو السيناريو الذي يكون لديك فيه AK/SK للعمل.
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
اسم المعلمة |
وصف المعلمة |
مثال القيمة |
|---|---|---|
|
معرف مفتاح الوصول |
|
|
قيمة مفتاح الوصول |
|
|
معرّف منطقة السحابة |
|
|
معرف موفر السحابة |
|
|
ما إذا كان يتم استخدام SSL لإنشاء اتصالات. |
|
|
ما إذا كان سيتم استخدام الاستضافة الافتراضية للوصول إلى دلو الخاص بك. لمزيد من التفاصيل، راجع هذه المقالة. |
|
استخدام AWS IAM
ينطبق هذا الخيار على السيناريو الذي يتم فيه تشغيل Milvus على مثيل EC2 أو مجموعة EKS. في هذه الحالة، لا تحتاج إلى ترميز AK/SK.
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
اسم المعلمة |
وصف المعلمة |
مثال القيمة |
|---|---|---|
|
ما إذا كنت تريد استخدام AWS IAM. اضبط هذا على |
|
|
نقطة نهاية AWS STS صالحة. لمزيد من التفاصيل، راجع هذه المقالة. |
|
|
معرف منطقة السحابة |
|
|
معرف موفر السحابة |
|
|
ما إذا كان يتم استخدام SSL لإنشاء اتصالات. |
|
استخدام بيانات اعتماد Milvus العامة
ينطبق هذا الخيار عندما تقوم بتخزين بيانات خارجية في دلو Milvus، ويمكن استخدام إعدادات MinIO العامة المحددة في milvus.yaml مباشرة للوصول إلى البيانات.
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
استخدام دور IAM ARN
يطبق هذا الخيار عندما تستخدم مؤسستك حسابات AWS مختلفة لإدارة مجموعة Milvus والحاوية التي تحتوي على ملفات البيانات المستهدفة.
في هذه الحالة، يجب أن يقوم مالك الدلو بإنشاء دور IAM الذي
إرفاق
AmazonS3FullAccessأو سياسة أكثر دقة للوصول إلى الجرافة.يتضمن
sts:ExternalIdمعرّف ذاتيًا في حقل الحالة في نهج الثقة الخاص بالدور.
بعد ذلك، يجب على مالك الدلو تزويدك بدور IAM ARN والمعرف الخارجي حتى تتمكن من الاتصال بـ sts:AssumeRole بهذه القيم لتولي دور IAM.
فيما يلي مثال على سياسة الأذونات التي سيتم إرفاقها بدور IAM مع الأذونات المسموح بها. يمكنك تعديل ذلك لتلبية متطلباتك.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
ونهج الثقة المرتبط بدور IAM يحدد من المسموح له بتوليه.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
بمجرد حصولك على ARN دور IAM والمعرف الخارجي، يمكنك إعداد المعلمة external_spec على النحو التالي:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
اسم المعلمة |
وصف المعلمة |
مثال القيمة |
|---|---|---|
|
معرف موفر السحابة |
|
|
معرف منطقة السحابة |
|
|
ما إذا كان يتم استخدام SSL لإنشاء اتصالات. |
|
|
ما إذا كان سيتم استخدام AWS IAM. اضبط هذا على |
|
|
معرف دور IAM الذي تم الحصول عليه من مالك الجرافة. |
|
|
المعرف الخارجي الذي تم الحصول عليه من مالك الجرافة. |
-- |
|
الفاصل الزمني الذي يسترد فيه Milvus بيانات اعتماد المصادقة المؤقتة بالثواني. |
|
الخطوة 2: إضافة الحقول
بمجرد أن يصبح المخطط جاهزًا، يمكنك إضافة حقول على النحو التالي:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
الخطوة 3: إنشاء مجموعة
بعد إضافة جميع الحقول إلى المخطط، يمكنك إنشاء المجموعة الخارجية.
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
الخطوة 4: إنشاء فهارس
يمكنك إنشاء فهارس لأعمدة المجموعة الخارجية كما تفعل في المجموعات المُدارة.
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
الخطوة 5: تحديث البيانات
بمجرد أن تصبح المجموعة جاهزة، قم بتحديثها لإنشاء البيانات الوصفية والفهارس لبياناتك.
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
عملية التحديث غير متزامنة، لذا تحتاج إلى إعداد تكرار لمراقبة تقدمها.
تفحص عملية التحديث البيانات الوصفية لملفات البيانات وتنشئ ملفات البيان وفقًا لذلك. تستغرق عادةً 150-250 مللي ثانية.
تقوم ملفات البيان بتسجيل التعيين بين البيانات الوصفية في ملف Milvus والصفوف في الملفات الخارجية.
إذا كان هناك تحديث لبيانات المصدر، فأنت بحاجة إلى استدعاء التحديث يدويًا مرة أخرى لإبقاء ملف Milvus محدثًا.
يؤدي التحديث الذي يتطلب إزالة جميع البيانات الوصفية النشطة دون أي عمليات إدراج إلى رفض.
المتابعة
بمجرد أن تقوم بتحديث المجموعة الخارجية، يمكنك تحميل المجموعة وإصدارها وإجراء عمليات بحث واستعلامات التشابه في المجموعة الخارجية كما تفعل في أي مجموعة مُدارة، باستثناء أن المجموعات في قاعدة بيانات للحوسبة عند الطلب يجب أن تكون مرفقة بمجموعة عند الطلب لعمليات البحث والاستعلامات.
قبل إجراء عمليات DQL، مثل البحث والاستعلام والحصول والبحث المختلط، تحتاج إلى إنشاء جلسة عمل لإرفاق موارد الحوسبة الخاصة بمجموعة حسب الطلب.