Создание внешней коллекцииCompatible with Milvus 3.0.x
Внешняя коллекция - это тип коллекции данных в Milvus, которая получает доступ к данным из внешних систем хранения или таблиц баз данных, таких как AWS S3 и Iceberg, не копируя их в Milvus. Она действует как слой запросов над озерами данных, сохраняя совместимость с интерфейсами запросов Milvus.
Обзор
В типичном конвейере данных AI пользователи могут уже хранить свои данные в формате Parquet или других форматах в своей системе хранения, например в AWS S3. Чтобы заставить Milvus использовать эти внешние данные, пользователям обычно нужно импортировать их в собственное хранилище Milvus с помощью конвейеров извлечения-трансформирования-загрузки (ETL).
Такой рабочий процесс "принеси свои данные в Milvus" создает избыточные данные, которые сложно синхронизировать, и увеличивает нагрузку по техническому обслуживанию для обеспечения согласованности данных.
Рабочий процесс "принеси данные на вычисление
Чтобы решить эти проблемы, Milvus предоставляет внешние коллекции, которые позволяют вам получать доступ к внешним данным из Milvus, не заботясь о синхронизации данных и ETL-конвейерах.
Перенесите вычисления в рабочий процесс с данными
После создания внешняя коллекция может получить прямой доступ к вашим данным и хранить их там же, где вы их храните. В фоновом режиме Milvus создает файлы манифеста для записи сопоставлений между метаданными Milvus и строками во внешних файлах данных. После того как файлы манифеста будут готовы, вы можете создавать индексы во внешней коллекции, как и в любой другой управляемой коллекции.
Когда данные изменяются, вручную запускается посекундное обновление метаданных, благодаря чему Milvus всегда остается в актуальном состоянии.
Шаг 1: Создание схемы
Как и при создании управляемой коллекции, перед созданием внешней коллекции вам также необходимо создать схему. Однако схема немного отличается от схемы управляемой коллекции.
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
Чтобы создать схему для внешней коллекции, необходимо указать URI источника данных, формат данных и параметры аутентификации.
Имя параметра |
Описание параметра |
Пример Значение |
|---|---|---|
|
Формат файлов целевых исходных данных. |
|
|
Действительный идентификатор снимка таблицы Iceberg. Этот параметр применяется только при установке значения |
|
|
Параметры внешней файловой системы в строковой JSON-структуре. |
-- |
У вас есть следующие варианты настройки параметров аутентификации:
Использовать AWS AK/SK
Эта опция применяется к самостоятельному хостингу MinIO или сценарию, в котором у вас есть AK/SK для работы.
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
Имя параметра |
Описание параметра |
Пример Значение |
|---|---|---|
|
Идентификатор ключа доступа |
|
|
Значение ключа доступа |
|
|
Идентификатор региона облака |
|
|
Идентификатор облачного провайдера |
|
|
Используется ли SSL для установления соединений. |
|
|
Использовать ли виртуальный хостинг для доступа к вашему ведру. Для получения подробной информации см. эту статью. |
|
Использовать AWS IAM
Эта опция применима к сценарию, когда Milvus работает на экземпляре EC2 или кластере EKS. В этом случае вам не нужно жестко кодировать AK/SK.
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
Имя параметра |
Описание параметра |
Пример Значение |
|---|---|---|
|
Использовать ли AWS IAM. Для этой опции установите значение |
|
|
Действительная конечная точка AWS STS. Подробнее см. в этой статье. |
|
|
Идентификатор региона облака |
|
|
Идентификатор облачного провайдера |
|
|
Используется ли SSL для установления соединений. |
|
Использовать глобальные учетные данные Milvus
Этот параметр применяется, когда вы храните внешние данные в ведре Milvus, и глобальные настройки MinIO, указанные на milvus.yaml, могут использоваться непосредственно для доступа к данным.
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
Использовать ARN роли IAM
Этот вариант применяется, если ваша организация использует разные учетные записи AWS для управления кластером Milvus и ведром, в котором хранятся файлы целевых данных.
В этом случае владелец ведра должен создать роль IAM, которая
Прикрепляет
AmazonS3FullAccessили более тонкую политику для доступа к ведру.Включает самоопределенный
sts:ExternalIdв поле "Условие" политики доверия роли.
Затем владелец ведра должен предоставить вам ARN роли IAM и внешний идентификатор, чтобы вы могли обратиться к sts:AssumeRole с этими значениями для принятия роли IAM.
Ниже приведен пример политики разрешений, которая должна быть прикреплена к роли IAM с разрешенными разрешениями. Вы можете настроить ее в соответствии с вашими требованиями.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
А политика доверия, связанная с ролью IAM, определяет, кому разрешено принимать ее.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
Получив ARN роли IAM и внешний идентификатор, вы можете настроить параметр external_spec следующим образом:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
Имя параметра |
Описание параметра |
Пример значения |
|---|---|---|
|
Идентификатор облачного провайдера |
|
|
Идентификатор региона облака |
|
|
Используется ли SSL для установки соединений. |
|
|
Использовать ли AWS IAM. Для этой опции установите значение |
|
|
ARN роли IAM, полученный от владельца ведра. |
|
|
Внешний идентификатор, полученный от владельца ведра. |
-- |
|
Интервал, с которым Milvus извлекает временные учетные данные аутентификации, в секундах. |
|
Шаг 2: Добавьте поля
После того как схема готова, вы можете добавить поля следующим образом:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
Шаг 3: Создайте коллекцию
После добавления всех полей в схему можно создать внешнюю коллекцию.
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
Шаг 4: Создание индексов
Вы можете создавать индексы для столбцов внешней коллекции, как это делается в управляемых коллекциях.
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
Шаг 5: Обновление данных
Когда коллекция будет готова, обновите ее, чтобы создать метаданные и индексы для ваших данных.
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
Операция обновления является асинхронной, поэтому вам нужно настроить итерацию, чтобы следить за ее ходом.
Операция refresh сканирует метаданные файлов данных и генерирует соответствующие файлы манифеста. Обычно это занимает 150-250 мс.
В файлах манифеста записывается соответствие между метаданными в Milvus и строками во внешних файлах.
При обновлении исходных данных необходимо вручную вызвать обновление, чтобы поддерживать Milvus в актуальном состоянии.
Обновление, требующее удаления всех активных метаданных без каких-либо вставок, приводит к отказу.
Последующие действия
После обновления внешней коллекции можно загружать и освобождать коллекцию и выполнять поиск и запросы по аналогии с любой управляемой коллекцией, за исключением того, что коллекции в базе данных для вычислений по требованию должны быть подключены к кластеру по требованию для поиска и запросов.
Перед выполнением операций DQL, таких как поиск, запрос, получение и гибридный поиск, необходимо создать сеанс для подключения вычислительных ресурсов кластера по требованию.