Creare una raccolta esternaCompatible with Milvus 3.0.x

Una raccolta esterna è un tipo di raccolta di dati in Milvus che accede ai dati da sistemi di archiviazione esterni o tabelle di database come AWS S3 e Iceberg senza copiarli in Milvus. Agisce come un livello di query sui data lake, mantenendo la compatibilità con le interfacce di query di Milvus.

Panoramica

In una tipica pipeline di dati AI, gli utenti potrebbero aver già memorizzato i loro dati in Parquet o in altri formati sul loro sistema di archiviazione, come AWS S3. Per far sì che Milvus utilizzi questi dati archiviati esternamente, gli utenti di solito devono importarli nello storage di Milvus utilizzando pipeline ETL (Extract-Transform-Load).

Questo flusso di lavoro "porta i tuoi dati a Milvus" crea dati ridondanti difficili da sincronizzare e aumenta il carico di manutenzione tecnica per garantire la coerenza dei dati.

Bring data to compute workflow Flusso di lavoro "porta i dati al calcolo

Per risolvere questi problemi, Milvus offre collezioni esterne che consentono di accedere ai dati archiviati esternamente da Milvus senza preoccuparsi della sincronizzazione dei dati e delle pipeline ETL.

Bring compute to data workflow Portare il calcolo nel flusso di lavoro dei dati

Una volta creata, una collezione esterna può accedere direttamente ai vostri dati e conservarli nello stesso luogo in cui li avete memorizzati. In background, Milvus crea file manifest per registrare le mappature tra i metadati Milvus e le righe dei file di dati esterni. Una volta che i file manifest sono pronti, è possibile creare indici nella raccolta esterna come in qualsiasi altra raccolta gestita.

Quando i dati cambiano, l'attivazione manuale di un aggiornamento sub-secondo aggiorna i metadati, mantenendo Milvus sempre aggiornato.

Passo 1: creare lo schema

Come per la creazione di una raccolta gestita, anche per la creazione di una raccolta esterna è necessario creare uno schema. Tuttavia, lo schema è leggermente diverso da quello di una collezione gestita.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Per creare lo schema di una collezione esterna, è necessario specificare l'URI dei dati di origine, il formato dei dati e le impostazioni di autenticazione.

Nome del parametro

Descrizione del parametro

Esempio Valore

format

Formato dei file di dati sorgente di destinazione.

parquet

snapshot_id

Un ID valido dell'istantanea della tabella Iceberg. Questo parametro si applica solo quando si imposta format su iceberg_table.

473984310232959286

extfs

Impostazioni del file system esterno in una struttura JSON stringata.

--

Per impostare le impostazioni di autenticazione sono disponibili le seguenti opzioni:

Usa AWS AK/SK

Questa opzione si applica a MinIO self-hosted o allo scenario in cui si dispone di AK/SK per lavoro.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Nome del parametro

Descrizione del parametro

Esempio Valore

extfs.access_key_id

ID chiave di accesso

AKIA...

extfs.access_key_value

Valore della chiave di accesso

u7LH...

extfs.region

ID regione cloud

us-west-2

extfs.cloud_provider

ID del provider del cloud

aws

extfs.use_ssl

Se utilizzare SSL per stabilire le connessioni.

true

extfs.use_virtual_host

Se utilizzare l'hosting virtuale per l'accesso al proprio bucket.

Per maggiori dettagli, consultare questo articolo.

true

Usa AWS IAM

Questa opzione si applica allo scenario in cui Milvus viene eseguito su un'istanza EC2 o un cluster EKS. In questo caso, non è necessario codificare l'AK/SK.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Nome del parametro

Descrizione del parametro

Esempio Valore

extfs.use_iam

Se utilizzare AWS IAM.

Impostare su "true" per questa opzione.

true

extfs.iam_endpoint

Un endpoint AWS STS valido.

Per i dettagli, fare riferimento a questo articolo.

https:*//*sts.<region>.amazonaws.com

extfs.region

ID regione cloud

us-west-2

extfs.cloud_provider

ID del provider del cloud

aws

extfs.use_ssl

Se si usa SSL per stabilire le connessioni.

true

Usa le credenziali globali Milvus

Questa opzione si applica quando si memorizzano dati esterni nel bucket Milvus e le impostazioni globali di MinIO specificate in milvus.yaml possono essere utilizzate direttamente per accedere ai dati.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

Usa ARN del ruolo IAM

Questa opzione si applica quando l'organizzazione utilizza diversi account AWS per gestire il cluster Milvus e il bucket che contiene i file di dati di destinazione.

In questo caso, il proprietario del bucket dovrebbe creare un ruolo IAM che

  • Allega AmazonS3FullAccess o un criterio più dettagliato per l'accesso al bucket.

  • Include un sts:ExternalId autodefinito nel campo Condizione del criterio di fiducia del ruolo.

Quindi, il proprietario del bucket deve fornire l'ARN del ruolo IAM e l'ID esterno, in modo da poter chiamare sts:AssumeRole con questi valori per assumere il ruolo IAM.

Di seguito è riportato un esempio di criterio di autorizzazione da allegare al ruolo IAM con le autorizzazioni consentite. È possibile modificarlo per soddisfare le proprie esigenze.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

Il criterio di fiducia associato al ruolo IAM definisce chi può assumerlo.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Una volta ottenuto l'ARN del ruolo IAM e l'ID esterno, è possibile impostare il parametro external_spec come segue:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Nome del parametro

Descrizione del parametro

Esempio Valore

extfs.cloud_provider

ID provider cloud

aws

extfs.region

ID regione cloud

us-west-2

extfs.use_ssl

Se utilizzare SSL per stabilire le connessioni.

true

extfs.use_iam

Se utilizzare AWS IAM.

Impostare questo valore su "true" per questa opzione.

true

extfs.role_arn

ARN del ruolo IAM ottenuto dal proprietario del bucket.

arn:aws:iam::306787000000:role/...

extfs.external_id

ID esterno ottenuto dal proprietario del bucket.

--

extfs.load_frequency

Intervallo in cui Milvus recupera le credenziali di autenticazione temporanee, in secondi.

900

Passo 2: Aggiungere i campi

Una volta che lo schema è pronto, è possibile aggiungere i campi come segue:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Passo 3: creare una collezione

Dopo aver aggiunto tutti i campi allo schema, si può creare la collezione esterna.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Passo 4: Creare gli indici

È possibile creare indici per le colonne della collezione esterna come per le collezioni gestite.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Passo 5: Aggiornare i dati

Una volta che la collezione è pronta, aggiornarla per creare i metadati e gli indici per i dati.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

L'operazione di refresh è asincrona, quindi è necessario impostare un'iterazione per monitorare i suoi progressi.

  • L'operazione di aggiornamento analizza i metadati dei file di dati e genera i file manifest di conseguenza. Di solito richiede 150-250 ms.

  • I file manifest registrano la mappatura tra i metadati di Milvus e le righe dei file esterni.

  • In caso di aggiornamento dei dati di origine, è necessario richiamare manualmente l'aggiornamento per mantenere Milvus aggiornato.

  • Un aggiornamento che richiede la rimozione di tutti i metadati attivi senza alcun inserimento comporta un rifiuto.

Aggiornamenti successivi

Una volta aggiornata la collezione esterna, è possibile caricarla e rilasciarla ed eseguire ricerche e query simili nella collezione esterna come in qualsiasi collezione gestita, ad eccezione del fatto che le collezioni in un database per l'elaborazione on-demand devono essere collegate a un cluster on-demand per le ricerche e le query.

Prima di eseguire operazioni DQL, quali ricerca, query, get e ricerca ibrida, è necessario creare una sessione per collegare le risorse di calcolo di un cluster on-demand.