Créer une collection externeCompatible with Milvus 3.0.x

Une collection externe est un type de collection de données dans Milvus qui accède aux données à partir de systèmes de stockage externes ou de tables de base de données telles que AWS S3 et Iceberg sans les copier dans Milvus. Elle agit comme une couche de requête sur les lacs de données tout en maintenant la compatibilité avec les interfaces de requête de Milvus.

Vue d'ensemble

Dans un pipeline de données d'IA typique, les utilisateurs peuvent déjà avoir stocké leurs données dans Parquet ou d'autres formats sur leur système de stockage, tel que AWS S3. Pour que Milvus puisse utiliser ces données stockées en externe, les utilisateurs doivent généralement les importer dans le propre stockage de Milvus à l'aide de pipelines d'extraction, de transformation et de chargement (ETL).

Ce flux de travail "apportez vos données à Milvus" crée des données redondantes difficiles à synchroniser et alourdit la charge de maintenance de l'ingénierie pour garantir la cohérence des données.

Bring data to compute workflow Flux de travail "apporter les données au calcul

Pour résoudre ces problèmes, Milvus propose des collections externes qui vous permettent d'accéder à vos données stockées en externe à partir de Milvus sans vous soucier de la synchronisation des données et des pipelines ETL.

Bring compute to data workflow Apporter le calcul au flux de travail des données

Une fois créée, une collection externe peut accéder directement à vos données et les conserver à l'endroit même où vous les stockez. En arrière-plan, Milvus crée des fichiers manifestes pour enregistrer les mappages entre les métadonnées Milvus et les lignes des fichiers de données externes. Une fois les fichiers manifestes prêts, vous pouvez créer des index dans la collection externe comme vous le feriez dans n'importe quelle collection gérée.

Lorsque vos données changent, le déclenchement manuel d'une actualisation en moins d'une seconde met à jour les métadonnées, ce qui permet à Milvus d'être toujours à jour.

Étape 1 : Création du schéma

Comme pour la création d'une collection gérée, vous devez également créer un schéma avant de créer une collection externe. Cependant, le schéma est légèrement différent de celui d'une collection gérée.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Pour créer le schéma d'une collection externe, vous devez spécifier l'URI des données sources, le format des données et les paramètres d'authentification.

Nom du paramètre

Description du paramètre

Exemple Valeur

format

Format des fichiers de données source cibles.

parquet

snapshot_id

ID d'instantané de table Iceberg valide. Ce paramètre s'applique uniquement lorsque vous définissez format sur iceberg_table.

473984310232959286

extfs

Paramètres du système de fichiers externe dans une structure JSON chaine.

--

Vous disposez des options suivantes pour définir les paramètres d'authentification :

Utiliser AWS AK/SK

Cette option s'applique à MinIO auto-hébergé ou au scénario dans lequel vous disposez d'AK/SK pour votre travail.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Nom du paramètre

Description du paramètre

Exemple Valeur

extfs.access_key_id

ID de la clé d'accès

AKIA...

extfs.access_key_value

Valeur de la clé d'accès

u7LH...

extfs.region

ID de la région du nuage

us-west-2

extfs.cloud_provider

ID du fournisseur de cloud

aws

extfs.use_ssl

Si SSL est utilisé pour établir des connexions.

true

extfs.use_virtual_host

Utiliser ou non l'hébergement virtuel pour l'accès à votre bucket.

Pour plus de détails, reportez-vous à cet article.

true

Utiliser AWS IAM

Cette option s'applique au scénario dans lequel Milvus s'exécute sur une instance EC2 ou un cluster EKS. Dans ce cas, il n'est pas nécessaire de coder en dur l'AK/SK.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Nom du paramètre

Description du paramètre

Exemple Valeur

extfs.use_iam

Utiliser ou non AWS IAM.

Réglez ce paramètre sur "true" pour cette option.

true

extfs.iam_endpoint

Un point d'extrémité AWS STS valide.

Pour plus d'informations, consultez cet article.

https:*//*sts.<region>.amazonaws.com

extfs.region

ID de la région cloud

us-west-2

extfs.cloud_provider

ID du fournisseur de cloud

aws

extfs.use_ssl

Si SSL est utilisé pour établir des connexions.

true

Utiliser les informations d'identification globales Milvus

Cette option s'applique lorsque vous stockez des données externes dans le seau Milvus et que les paramètres MinIO globaux spécifiés dans milvus.yaml peuvent être utilisés directement pour accéder aux données.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

Utiliser l'ARN du rôle IAM

Cette option s'applique lorsque votre organisation utilise différents comptes AWS pour gérer le cluster Milvus et le seau qui contient les fichiers de données cibles.

Dans ce cas, le propriétaire du seau doit créer un rôle IAM qui

  • Attache AmazonS3FullAccess ou une politique plus fine pour l'accès au seau.

  • Inclut une adresse sts:ExternalId définie par l'utilisateur dans le champ Condition de la politique de confiance du rôle.

Ensuite, le propriétaire du seau doit vous fournir l'ARN du rôle IAM et l'ID externe afin que vous puissiez appeler sts:AssumeRole avec ces valeurs pour assumer le rôle IAM.

Voici un exemple de politique d'autorisation à attacher au rôle IAM avec les autorisations autorisées. Vous pouvez l'adapter à vos besoins.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

La politique de confiance associée au rôle IAM définit qui est autorisé à l'assumer.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Une fois que vous avez obtenu l'ARN du rôle IAM et l'ID externe, vous pouvez configurer le paramètre external_spec comme suit :

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Nom du paramètre

Description du paramètre

Exemple Valeur

extfs.cloud_provider

ID du fournisseur de cloud

aws

extfs.region

ID de la région du nuage

us-west-2

extfs.use_ssl

Si SSL est utilisé pour établir des connexions.

true

extfs.use_iam

Utiliser ou non AWS IAM.

Définissez cette valeur sur "true" pour cette option.

true

extfs.role_arn

ARN du rôle IAM obtenu auprès du propriétaire du seau.

arn:aws:iam::306787000000:role/...

extfs.external_id

ID externe obtenu auprès du propriétaire du seau.

--

extfs.load_frequency

Intervalle auquel Milvus récupère les informations d'authentification temporaires en secondes.

900

Étape 2 : Ajouter des champs

Une fois que le schéma est prêt, vous pouvez ajouter des champs comme suit :

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Étape 3 : Créer une collection

Après avoir ajouté tous les champs au schéma, vous pouvez créer la collection externe.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Étape 4 : Créer des index

Vous pouvez créer des index pour les colonnes de la collection externe comme vous le faites pour les collections gérées.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Étape 5 : Actualiser les données

Une fois que la collection est prête, rafraîchissez-la pour créer les métadonnées et les index de vos données.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

L'opération de rafraîchissement est asynchrone, vous devez donc mettre en place une itération pour suivre sa progression.

  • L'opération de rafraîchissement analyse les métadonnées des fichiers de données et génère les fichiers manifestes en conséquence. Elle prend généralement entre 150 et 250 ms.

  • Les fichiers manifestes enregistrent le mappage entre les métadonnées dans Milvus et les lignes dans les fichiers externes.

  • En cas de mise à jour de vos données sources, vous devez appeler manuellement l'actualisation pour que Milvus reste à jour.

  • Une actualisation qui nécessite la suppression de toutes les métadonnées actives sans aucune insertion entraîne un refus.

Suivi

Une fois que vous avez actualisé la collection externe, vous pouvez charger et libérer la collection et effectuer des recherches et des requêtes de similarité dans la collection externe comme vous le feriez dans n'importe quelle collection gérée, sauf que les collections dans une base de données pour le calcul à la demande doivent être attachées à un cluster à la demande pour les recherches et les requêtes.

Avant d'effectuer des opérations DQL, telles que la recherche, la requête, l'obtention et la recherche hybride, vous devez créer une session pour attacher les ressources de calcul d'un cluster à la demande.