Создание внешней коллекцииCompatible with Milvus 3.0.x

Внешняя коллекция - это тип коллекции данных в Milvus, которая получает доступ к данным из внешних систем хранения или таблиц баз данных, таких как AWS S3 и Iceberg, не копируя их в Milvus. Она действует как слой запросов над озерами данных, сохраняя совместимость с интерфейсами запросов Milvus.

Обзор

В типичном конвейере данных AI пользователи могут уже хранить свои данные в формате Parquet или других форматах в своей системе хранения, например в AWS S3. Чтобы заставить Milvus использовать эти внешние данные, пользователям обычно нужно импортировать их в собственное хранилище Milvus с помощью конвейеров извлечения-трансформирования-загрузки (ETL).

Такой рабочий процесс "принеси свои данные в Milvus" создает избыточные данные, которые сложно синхронизировать, и увеличивает нагрузку по техническому обслуживанию для обеспечения согласованности данных.

Bring data to compute workflow Рабочий процесс "принеси данные на вычисление

Чтобы решить эти проблемы, Milvus предоставляет внешние коллекции, которые позволяют вам получать доступ к внешним данным из Milvus, не заботясь о синхронизации данных и ETL-конвейерах.

Bring compute to data workflow Перенесите вычисления в рабочий процесс с данными

После создания внешняя коллекция может получить прямой доступ к вашим данным и хранить их там же, где вы их храните. В фоновом режиме Milvus создает файлы манифеста для записи сопоставлений между метаданными Milvus и строками во внешних файлах данных. После того как файлы манифеста будут готовы, вы можете создавать индексы во внешней коллекции, как и в любой другой управляемой коллекции.

Когда данные изменяются, вручную запускается посекундное обновление метаданных, благодаря чему Milvus всегда остается в актуальном состоянии.

Шаг 1: Создание схемы

Как и при создании управляемой коллекции, перед созданием внешней коллекции вам также необходимо создать схему. Однако схема немного отличается от схемы управляемой коллекции.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Чтобы создать схему для внешней коллекции, необходимо указать URI источника данных, формат данных и параметры аутентификации.

Имя параметра

Описание параметра

Пример Значение

format

Формат файлов целевых исходных данных.

parquet

snapshot_id

Действительный идентификатор снимка таблицы Iceberg. Этот параметр применяется только при установке значения format на iceberg_table.

473984310232959286

extfs

Параметры внешней файловой системы в строковой JSON-структуре.

--

У вас есть следующие варианты настройки параметров аутентификации:

Использовать AWS AK/SK

Эта опция применяется к самостоятельному хостингу MinIO или сценарию, в котором у вас есть AK/SK для работы.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Имя параметра

Описание параметра

Пример Значение

extfs.access_key_id

Идентификатор ключа доступа

AKIA...

extfs.access_key_value

Значение ключа доступа

u7LH...

extfs.region

Идентификатор региона облака

us-west-2

extfs.cloud_provider

Идентификатор облачного провайдера

aws

extfs.use_ssl

Используется ли SSL для установления соединений.

true

extfs.use_virtual_host

Использовать ли виртуальный хостинг для доступа к вашему ведру.

Для получения подробной информации см. эту статью.

true

Использовать AWS IAM

Эта опция применима к сценарию, когда Milvus работает на экземпляре EC2 или кластере EKS. В этом случае вам не нужно жестко кодировать AK/SK.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Имя параметра

Описание параметра

Пример Значение

extfs.use_iam

Использовать ли AWS IAM.

Для этой опции установите значение "true".

true

extfs.iam_endpoint

Действительная конечная точка AWS STS.

Подробнее см. в этой статье.

https:*//*sts.<region>.amazonaws.com

extfs.region

Идентификатор региона облака

us-west-2

extfs.cloud_provider

Идентификатор облачного провайдера

aws

extfs.use_ssl

Используется ли SSL для установления соединений.

true

Использовать глобальные учетные данные Milvus

Этот параметр применяется, когда вы храните внешние данные в ведре Milvus, и глобальные настройки MinIO, указанные на milvus.yaml, могут использоваться непосредственно для доступа к данным.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

Использовать ARN роли IAM

Этот вариант применяется, если ваша организация использует разные учетные записи AWS для управления кластером Milvus и ведром, в котором хранятся файлы целевых данных.

В этом случае владелец ведра должен создать роль IAM, которая

  • Прикрепляет AmazonS3FullAccess или более тонкую политику для доступа к ведру.

  • Включает самоопределенный sts:ExternalId в поле "Условие" политики доверия роли.

Затем владелец ведра должен предоставить вам ARN роли IAM и внешний идентификатор, чтобы вы могли обратиться к sts:AssumeRole с этими значениями для принятия роли IAM.

Ниже приведен пример политики разрешений, которая должна быть прикреплена к роли IAM с разрешенными разрешениями. Вы можете настроить ее в соответствии с вашими требованиями.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

А политика доверия, связанная с ролью IAM, определяет, кому разрешено принимать ее.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Получив ARN роли IAM и внешний идентификатор, вы можете настроить параметр external_spec следующим образом:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Имя параметра

Описание параметра

Пример значения

extfs.cloud_provider

Идентификатор облачного провайдера

aws

extfs.region

Идентификатор региона облака

us-west-2

extfs.use_ssl

Используется ли SSL для установки соединений.

true

extfs.use_iam

Использовать ли AWS IAM.

Для этой опции установите значение "true".

true

extfs.role_arn

ARN роли IAM, полученный от владельца ведра.

arn:aws:iam::306787000000:role/...

extfs.external_id

Внешний идентификатор, полученный от владельца ведра.

--

extfs.load_frequency

Интервал, с которым Milvus извлекает временные учетные данные аутентификации, в секундах.

900

Шаг 2: Добавьте поля

После того как схема готова, вы можете добавить поля следующим образом:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Шаг 3: Создайте коллекцию

После добавления всех полей в схему можно создать внешнюю коллекцию.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Шаг 4: Создание индексов

Вы можете создавать индексы для столбцов внешней коллекции, как это делается в управляемых коллекциях.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Шаг 5: Обновление данных

Когда коллекция будет готова, обновите ее, чтобы создать метаданные и индексы для ваших данных.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

Операция обновления является асинхронной, поэтому вам нужно настроить итерацию, чтобы следить за ее ходом.

  • Операция refresh сканирует метаданные файлов данных и генерирует соответствующие файлы манифеста. Обычно это занимает 150-250 мс.

  • В файлах манифеста записывается соответствие между метаданными в Milvus и строками во внешних файлах.

  • При обновлении исходных данных необходимо вручную вызвать обновление, чтобы поддерживать Milvus в актуальном состоянии.

  • Обновление, требующее удаления всех активных метаданных без каких-либо вставок, приводит к отказу.

Последующие действия

После обновления внешней коллекции можно загружать и освобождать коллекцию и выполнять поиск и запросы по аналогии с любой управляемой коллекцией, за исключением того, что коллекции в базе данных для вычислений по требованию должны быть подключены к кластеру по требованию для поиска и запросов.

Перед выполнением операций DQL, таких как поиск, запрос, получение и гибридный поиск, необходимо создать сеанс для подключения вычислительных ресурсов кластера по требованию.