Criar uma coleção externaCompatible with Milvus 3.0.x

Uma coleção externa é um tipo de coleção de dados no Milvus que acede a dados de sistemas de armazenamento externos ou tabelas de bases de dados como o AWS S3 e o Iceberg sem os copiar para o Milvus. Actua como uma camada de consulta sobre os lagos de dados, mantendo a compatibilidade com as interfaces de consulta do Milvus.

Visão geral

Num pipeline de dados de IA típico, os utilizadores podem já ter armazenado os seus dados em Parquet ou noutros formatos no seu sistema de armazenamento, como o AWS S3. Para fazer com que o Milvus consuma esses dados armazenados externamente, os usuários geralmente precisam importá-los para o próprio armazenamento do Milvus usando pipelines Extract-Transform-Load (ETL).

Este fluxo de trabalho "traga os seus dados para o Milvus" cria dados redundantes que são difíceis de sincronizar e aumenta a carga de manutenção da engenharia para garantir a consistência dos dados.

Bring data to compute workflow Fluxo de trabalho "Bring data to compute

Para resolver estes problemas, o Milvus fornece colecções externas que lhe permitem aceder aos seus dados armazenados externamente a partir do Milvus sem se preocupar com a sincronização dos dados e com os pipelines ETL.

Bring compute to data workflow Trazer a computação para o fluxo de trabalho de dados

Uma vez criada, uma coleção externa pode aceder diretamente aos seus dados e mantê-los no mesmo local onde os armazena. Em segundo plano, o Milvus cria ficheiros de manifesto para registar os mapeamentos entre os metadados do Milvus e as linhas nos ficheiros de dados externos. Depois que os arquivos de manifesto estiverem prontos, você pode criar índices na coleção externa como faria em qualquer coleção gerenciada.

Quando os dados são alterados, o acionamento manual de uma atualização de sub-segundo atualiza os metadados, mantendo o Milvus sempre atualizado.

Passo 1: Criar esquema

Tal como acontece com a criação de uma coleção gerida, também é necessário criar um esquema antes de criar uma coleção externa. No entanto, o esquema é ligeiramente diferente do de uma coleção gerida.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Para criar o esquema para uma coleção externa, é necessário especificar o URI dos dados de origem, o formato dos dados e as definições de autenticação.

Nome do parâmetro

Descrição do parâmetro

Exemplo Valor

format

Formato dos ficheiros de dados de origem de destino.

parquet

snapshot_id

Um ID de instantâneo de tabela Iceberg válido. Este parâmetro aplica-se apenas quando define format para iceberg_table.

473984310232959286

extfs

Definições do sistema de ficheiros externo numa estrutura JSON encadeada.

--

Tem as seguintes opções para definir as definições de autenticação:

Usar AWS AK/SK

Esta opção aplica-se ao MinIO auto-hospedado ou ao cenário em que tem AK/SK para trabalhar.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Nome do parâmetro

Descrição do parâmetro

Exemplo Valor

extfs.access_key_id

ID da chave de acesso

AKIA...

extfs.access_key_value

Valor da chave de acesso

u7LH...

extfs.region

ID da região de nuvem

us-west-2

extfs.cloud_provider

ID do fornecedor de serviços de computação em nuvem

aws

extfs.use_ssl

Se o SSL é usado para estabelecer conexões.

true

extfs.use_virtual_host

Se deve usar a hospedagem virtual para acessar o seu bucket.

Para obter detalhes, consulte este artigo.

true

Usar AWS IAM

Esta opção se aplica ao cenário em que o Milvus é executado em uma instância EC2 ou em um cluster EKS. Neste caso, não é necessário codificar o AK/SK.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Nome do parâmetro

Descrição do parâmetro

Exemplo Valor

extfs.use_iam

Se deve utilizar o AWS IAM.

Defina este valor como "true" para esta opção.

true

extfs.iam_endpoint

Um ponto de extremidade válido do AWS STS.

Para obter detalhes, consulte este artigo.

https:*//*sts.<region>.amazonaws.com

extfs.region

ID da região de nuvem

us-west-2

extfs.cloud_provider

ID do provedor de nuvem

aws

extfs.use_ssl

Se o SSL é usado para estabelecer conexões.

true

Usar credenciais globais do Milvus

Esta opção aplica-se quando armazena dados externos no bucket do Milvus e as definições globais do MinIO especificadas em milvus.yaml podem ser utilizadas diretamente para aceder aos dados.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

Usar ARN de função do IAM

Esta opção se aplica quando sua organização usa contas AWS diferentes para gerenciar o cluster do Milvus e o bucket que contém os arquivos de dados de destino.

Nesse caso, o proprietário do bucket deve criar uma função do IAM que

  • Anexa AmazonS3FullAccess ou uma política mais refinada para acesso ao bucket.

  • Inclui uma autodefinição sts:ExternalId no campo Condição da Política de Confiança da função.

Em seguida, o proprietário do bucket deve fornecer o ARN da função de IAM e a ID externa para que você possa chamar sts:AssumeRole com esses valores para assumir a função de IAM.

A seguir, um exemplo de política de permissão a ser anexada à função de IAM com as permissões permitidas. Pode ajustar isto para satisfazer os seus requisitos.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

E a política de confiança associada à função de IAM define quem tem permissão para a assumir.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Depois de obter o ARN da função de IAM e a ID externa, pode configurar o parâmetro external_spec da seguinte forma:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Nome do parâmetro

Descrição do parâmetro

Exemplo Valor

extfs.cloud_provider

ID do provedor de nuvem

aws

extfs.region

ID da região de nuvem

us-west-2

extfs.use_ssl

Se o SSL é usado para estabelecer conexões.

true

extfs.use_iam

Se deve ser usado o AWS IAM.

Defina isso como "true" para essa opção.

true

extfs.role_arn

ARN de função do IAM obtido do proprietário do bucket.

arn:aws:iam::306787000000:role/...

extfs.external_id

ID externo obtido do proprietário do bucket.

--

extfs.load_frequency

Intervalo em que Milvus recupera credenciais de autenticação temporárias em segundos.

900

Etapa 2: Adicionar campos

Quando o esquema estiver pronto, pode adicionar campos da seguinte forma:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Passo 3: Criar uma coleção

Depois de adicionar todos os campos ao esquema, pode criar a coleção externa.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Passo 4: Criar índices

Pode criar índices para colunas de colecções externas, tal como faz nas colecções geridas.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Etapa 5: atualizar dados

Quando a coleção estiver pronta, atualize-a para criar os metadados e índices para seus dados.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

A operação de atualização é assíncrona, portanto, é necessário configurar uma iteração para monitorar seu progresso.

  • A operação de atualização verifica os metadados dos ficheiros de dados e gera os ficheiros de manifesto em conformidade. Normalmente demora 150-250 ms.

  • Os ficheiros de manifesto registam o mapeamento entre os metadados no Milvus e as linhas nos ficheiros externos.

  • Se houver uma atualização dos dados de origem, é necessário chamar manualmente o refresh novamente para manter o Milvus atualizado.

  • Uma atualização que exija a remoção de todos os metadados activos sem quaisquer inserções resulta numa recusa.

Acompanhamento

Depois de atualizar a coleção externa, pode carregar e libertar a coleção e efetuar pesquisas e consultas semelhantes na coleção externa como faria em qualquer coleção gerida, exceto que as colecções numa base de dados para computação a pedido devem ser anexadas a um cluster a pedido para pesquisas e consultas.

Antes de realizar operações DQL, como pesquisa, consulta, get e pesquisa híbrida, é necessário criar uma sessão para anexar os recursos de computação de um cluster on-demand.