Criar uma coleção externaCompatible with Milvus 3.0.x
Uma coleção externa é um tipo de coleção de dados no Milvus que acede a dados de sistemas de armazenamento externos ou tabelas de bases de dados como o AWS S3 e o Iceberg sem os copiar para o Milvus. Actua como uma camada de consulta sobre os lagos de dados, mantendo a compatibilidade com as interfaces de consulta do Milvus.
Visão geral
Num pipeline de dados de IA típico, os utilizadores podem já ter armazenado os seus dados em Parquet ou noutros formatos no seu sistema de armazenamento, como o AWS S3. Para fazer com que o Milvus consuma esses dados armazenados externamente, os usuários geralmente precisam importá-los para o próprio armazenamento do Milvus usando pipelines Extract-Transform-Load (ETL).
Este fluxo de trabalho "traga os seus dados para o Milvus" cria dados redundantes que são difíceis de sincronizar e aumenta a carga de manutenção da engenharia para garantir a consistência dos dados.
Fluxo de trabalho "Bring data to compute
Para resolver estes problemas, o Milvus fornece colecções externas que lhe permitem aceder aos seus dados armazenados externamente a partir do Milvus sem se preocupar com a sincronização dos dados e com os pipelines ETL.
Trazer a computação para o fluxo de trabalho de dados
Uma vez criada, uma coleção externa pode aceder diretamente aos seus dados e mantê-los no mesmo local onde os armazena. Em segundo plano, o Milvus cria ficheiros de manifesto para registar os mapeamentos entre os metadados do Milvus e as linhas nos ficheiros de dados externos. Depois que os arquivos de manifesto estiverem prontos, você pode criar índices na coleção externa como faria em qualquer coleção gerenciada.
Quando os dados são alterados, o acionamento manual de uma atualização de sub-segundo atualiza os metadados, mantendo o Milvus sempre atualizado.
Passo 1: Criar esquema
Tal como acontece com a criação de uma coleção gerida, também é necessário criar um esquema antes de criar uma coleção externa. No entanto, o esquema é ligeiramente diferente do de uma coleção gerida.
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
Para criar o esquema para uma coleção externa, é necessário especificar o URI dos dados de origem, o formato dos dados e as definições de autenticação.
Nome do parâmetro |
Descrição do parâmetro |
Exemplo Valor |
|---|---|---|
|
Formato dos ficheiros de dados de origem de destino. |
|
|
Um ID de instantâneo de tabela Iceberg válido. Este parâmetro aplica-se apenas quando define |
|
|
Definições do sistema de ficheiros externo numa estrutura JSON encadeada. |
-- |
Tem as seguintes opções para definir as definições de autenticação:
Usar AWS AK/SK
Esta opção aplica-se ao MinIO auto-hospedado ou ao cenário em que tem AK/SK para trabalhar.
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
Nome do parâmetro |
Descrição do parâmetro |
Exemplo Valor |
|---|---|---|
|
ID da chave de acesso |
|
|
Valor da chave de acesso |
|
|
ID da região de nuvem |
|
|
ID do fornecedor de serviços de computação em nuvem |
|
|
Se o SSL é usado para estabelecer conexões. |
|
|
Se deve usar a hospedagem virtual para acessar o seu bucket. Para obter detalhes, consulte este artigo. |
|
Usar AWS IAM
Esta opção se aplica ao cenário em que o Milvus é executado em uma instância EC2 ou em um cluster EKS. Neste caso, não é necessário codificar o AK/SK.
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
Nome do parâmetro |
Descrição do parâmetro |
Exemplo Valor |
|---|---|---|
|
Se deve utilizar o AWS IAM. Defina este valor como |
|
|
Um ponto de extremidade válido do AWS STS. Para obter detalhes, consulte este artigo. |
|
|
ID da região de nuvem |
|
|
ID do provedor de nuvem |
|
|
Se o SSL é usado para estabelecer conexões. |
|
Usar credenciais globais do Milvus
Esta opção aplica-se quando armazena dados externos no bucket do Milvus e as definições globais do MinIO especificadas em milvus.yaml podem ser utilizadas diretamente para aceder aos dados.
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
Usar ARN de função do IAM
Esta opção se aplica quando sua organização usa contas AWS diferentes para gerenciar o cluster do Milvus e o bucket que contém os arquivos de dados de destino.
Nesse caso, o proprietário do bucket deve criar uma função do IAM que
Anexa
AmazonS3FullAccessou uma política mais refinada para acesso ao bucket.Inclui uma autodefinição
sts:ExternalIdno campo Condição da Política de Confiança da função.
Em seguida, o proprietário do bucket deve fornecer o ARN da função de IAM e a ID externa para que você possa chamar sts:AssumeRole com esses valores para assumir a função de IAM.
A seguir, um exemplo de política de permissão a ser anexada à função de IAM com as permissões permitidas. Pode ajustar isto para satisfazer os seus requisitos.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
E a política de confiança associada à função de IAM define quem tem permissão para a assumir.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
Depois de obter o ARN da função de IAM e a ID externa, pode configurar o parâmetro external_spec da seguinte forma:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
Nome do parâmetro |
Descrição do parâmetro |
Exemplo Valor |
|---|---|---|
|
ID do provedor de nuvem |
|
|
ID da região de nuvem |
|
|
Se o SSL é usado para estabelecer conexões. |
|
|
Se deve ser usado o AWS IAM. Defina isso como |
|
|
ARN de função do IAM obtido do proprietário do bucket. |
|
|
ID externo obtido do proprietário do bucket. |
-- |
|
Intervalo em que Milvus recupera credenciais de autenticação temporárias em segundos. |
|
Etapa 2: Adicionar campos
Quando o esquema estiver pronto, pode adicionar campos da seguinte forma:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
Passo 3: Criar uma coleção
Depois de adicionar todos os campos ao esquema, pode criar a coleção externa.
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
Passo 4: Criar índices
Pode criar índices para colunas de colecções externas, tal como faz nas colecções geridas.
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
Etapa 5: atualizar dados
Quando a coleção estiver pronta, atualize-a para criar os metadados e índices para seus dados.
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
A operação de atualização é assíncrona, portanto, é necessário configurar uma iteração para monitorar seu progresso.
A operação de atualização verifica os metadados dos ficheiros de dados e gera os ficheiros de manifesto em conformidade. Normalmente demora 150-250 ms.
Os ficheiros de manifesto registam o mapeamento entre os metadados no Milvus e as linhas nos ficheiros externos.
Se houver uma atualização dos dados de origem, é necessário chamar manualmente o refresh novamente para manter o Milvus atualizado.
Uma atualização que exija a remoção de todos os metadados activos sem quaisquer inserções resulta numa recusa.
Acompanhamento
Depois de atualizar a coleção externa, pode carregar e libertar a coleção e efetuar pesquisas e consultas semelhantes na coleção externa como faria em qualquer coleção gerida, exceto que as colecções numa base de dados para computação a pedido devem ser anexadas a um cluster a pedido para pesquisas e consultas.
Antes de realizar operações DQL, como pesquisa, consulta, get e pesquisa híbrida, é necessário criar uma sessão para anexar os recursos de computação de um cluster on-demand.