Creare una raccolta esternaCompatible with Milvus 3.0.x
Una raccolta esterna è un tipo di raccolta di dati in Milvus che accede ai dati da sistemi di archiviazione esterni o tabelle di database come AWS S3 e Iceberg senza copiarli in Milvus. Agisce come un livello di query sui data lake, mantenendo la compatibilità con le interfacce di query di Milvus.
Panoramica
In una tipica pipeline di dati AI, gli utenti potrebbero aver già memorizzato i loro dati in Parquet o in altri formati sul loro sistema di archiviazione, come AWS S3. Per far sì che Milvus utilizzi questi dati archiviati esternamente, gli utenti di solito devono importarli nello storage di Milvus utilizzando pipeline ETL (Extract-Transform-Load).
Questo flusso di lavoro "porta i tuoi dati a Milvus" crea dati ridondanti difficili da sincronizzare e aumenta il carico di manutenzione tecnica per garantire la coerenza dei dati.
Flusso di lavoro "porta i dati al calcolo
Per risolvere questi problemi, Milvus offre collezioni esterne che consentono di accedere ai dati archiviati esternamente da Milvus senza preoccuparsi della sincronizzazione dei dati e delle pipeline ETL.
Portare il calcolo nel flusso di lavoro dei dati
Una volta creata, una collezione esterna può accedere direttamente ai vostri dati e conservarli nello stesso luogo in cui li avete memorizzati. In background, Milvus crea file manifest per registrare le mappature tra i metadati Milvus e le righe dei file di dati esterni. Una volta che i file manifest sono pronti, è possibile creare indici nella raccolta esterna come in qualsiasi altra raccolta gestita.
Quando i dati cambiano, l'attivazione manuale di un aggiornamento sub-secondo aggiorna i metadati, mantenendo Milvus sempre aggiornato.
Passo 1: creare lo schema
Come per la creazione di una raccolta gestita, anche per la creazione di una raccolta esterna è necessario creare uno schema. Tuttavia, lo schema è leggermente diverso da quello di una collezione gestita.
from pymilvus import MilvusClient, DataType
schema = MilvusClient.create_schema(
external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
external_spec='{
"format": "parquet",
"extfs": {
...
}
}'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
.externalSpec(externalSpec)
.build();
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema := entity.NewSchema().
WithName("product_embeddings").
WithExternalSource("s3://my-bucket/embeddings/").
WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
{
"fieldName": "product_id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "embedding",
"dataType": "FloatVector",
"elementTypeParams": {
"dim": "768"
}
},
{
"fieldName": "product_name",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 512
}
}
]'
Per creare lo schema di una collezione esterna, è necessario specificare l'URI dei dati di origine, il formato dei dati e le impostazioni di autenticazione.
Nome del parametro |
Descrizione del parametro |
Esempio Valore |
|---|---|---|
|
Formato dei file di dati sorgente di destinazione. |
|
|
Un ID valido dell'istantanea della tabella Iceberg. Questo parametro si applica solo quando si imposta |
|
|
Impostazioni del file system esterno in una struttura JSON stringata. |
-- |
Per impostare le impostazioni di autenticazione sono disponibili le seguenti opzioni:
Usa AWS AK/SK
Questa opzione si applica a MinIO self-hosted o allo scenario in cui si dispone di AK/SK per lavoro.
{
"format": "...",
"extfs": {
"access_key_id": "AKIA..",
"access_key_value": "u4Lh...",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true",
"use_virtual_host": "true"
}
}
Nome del parametro |
Descrizione del parametro |
Esempio Valore |
|---|---|---|
|
ID chiave di accesso |
|
|
Valore della chiave di accesso |
|
|
ID regione cloud |
|
|
ID del provider del cloud |
|
|
Se utilizzare SSL per stabilire le connessioni. |
|
|
Se utilizzare l'hosting virtuale per l'accesso al proprio bucket. Per maggiori dettagli, consultare questo articolo. |
|
Usa AWS IAM
Questa opzione si applica allo scenario in cui Milvus viene eseguito su un'istanza EC2 o un cluster EKS. In questo caso, non è necessario codificare l'AK/SK.
{
"format": "...",
"extfs": {
"use_iam": "true",
"iam_endpoint": "https://sts.<region>.amazonaws.com",
"region": "us-west-2",
"cloud_provider": "aws",
"use_ssl": "true"
}
}
Nome del parametro |
Descrizione del parametro |
Esempio Valore |
|---|---|---|
|
Se utilizzare AWS IAM. Impostare su |
|
|
Un endpoint AWS STS valido. Per i dettagli, fare riferimento a questo articolo. |
|
|
ID regione cloud |
|
|
ID del provider del cloud |
|
|
Se si usa SSL per stabilire le connessioni. |
|
Usa le credenziali globali Milvus
Questa opzione si applica quando si memorizzano dati esterni nel bucket Milvus e le impostazioni globali di MinIO specificate in milvus.yaml possono essere utilizzate direttamente per accedere ai dati.
{
"format": "...",
"extfs": {
"storage_type": "remote"
}
}
Usa ARN del ruolo IAM
Questa opzione si applica quando l'organizzazione utilizza diversi account AWS per gestire il cluster Milvus e il bucket che contiene i file di dati di destinazione.
In questo caso, il proprietario del bucket dovrebbe creare un ruolo IAM che
Allega
AmazonS3FullAccesso un criterio più dettagliato per l'accesso al bucket.Include un
sts:ExternalIdautodefinito nel campo Condizione del criterio di fiducia del ruolo.
Quindi, il proprietario del bucket deve fornire l'ARN del ruolo IAM e l'ID esterno, in modo da poter chiamare sts:AssumeRole con questi valori per assumere il ruolo IAM.
Di seguito è riportato un esempio di criterio di autorizzazione da allegare al ruolo IAM con le autorizzazioni consentite. È possibile modificarlo per soddisfare le proprie esigenze.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
}
]
}
Il criterio di fiducia associato al ruolo IAM definisce chi può assumerlo.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
}
}
}
]
}
Una volta ottenuto l'ARN del ruolo IAM e l'ID esterno, è possibile impostare il parametro external_spec come segue:
{
"format": "...",
"extfs": {
"cloud_provider": "aws",
"region": "us-west-2",
"storage_type": "remote",
"use_ssl": "true",
"use_iam": "true",
"role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
"external_id": "YOUR_UNIQUE_EXTERNAL_ID",
"load_frequency": "900"
}
}
Nome del parametro |
Descrizione del parametro |
Esempio Valore |
|---|---|---|
|
ID provider cloud |
|
|
ID regione cloud |
|
|
Se utilizzare SSL per stabilire le connessioni. |
|
|
Se utilizzare AWS IAM. Impostare questo valore su |
|
|
ARN del ruolo IAM ottenuto dal proprietario del bucket. |
|
|
ID esterno ottenuto dal proprietario del bucket. |
-- |
|
Intervallo in cui Milvus recupera le credenziali di autenticazione temporanee, in secondi. |
|
Passo 2: Aggiungere i campi
Una volta che lo schema è pronto, è possibile aggiungere i campi come segue:
schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
# highlight-next
external_field="id" # field name in the external data file
)
schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512,
# highlight-next
external_field="name"
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
# highlight-next
external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
schema.addField(AddFieldReq.builder()
.fieldName("product_id")
.dataType(DataType.Int64)
.externalField("id")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("product_name")
.dataType(DataType.VarChar)
.maxLength(512)
.externalField("name")
.build());
schema.addField(AddFieldReq.builder()
.fieldName("embedding")
.dataType(DataType.FloatVector)
.dimension(768)
.externalField("vector")
.build());
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
schema = schema.
WithField(
entity.NewField().
WithName("product_id").
WithDataType(entity.FieldTypeInt64).
WithExternalField("id"),
).
WithField(
entity.NewField().
WithName("product_name").
WithDataType(entity.FieldTypeVarChar).
WithMaxLength(512).
WithExternalField("name"),
).
WithField(
entity.NewField().
WithName("embedding").
WithDataType(entity.FieldTypeFloatVector).
WithDim(768).
WithExternalField("vector"),
)
// node
export schema="{
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
\"fields\": $fields
}"
Passo 3: creare una collezione
Dopo aver aggiunto tutti i campi allo schema, si può creare la collezione esterna.
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
client.create_collection(
collection_name="test_collection",
schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(connectConfig);
CreateCollectionReq createReq = CreateCollectionReq.builder()
.collectionName("test_collection")
.collectionSchema(schema)
.build();
client.createCollection(createReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
client "github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))
if err != nil {
fmt.Println(err.Error())
// handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"schema\": $schema
}"
Passo 4: Creare gli indici
È possibile creare indici per le colonne della collezione esterna come per le collezioni gestite.
index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)
index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)
client.create_index(
db_name="my_database",
collection_name="test_collection",
index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;
IndexParam indexParamForIdField = IndexParam.builder()
.fieldName("product_name")
.indexType(IndexParam.IndexType.AUTOINDEX)
.build();
IndexParam indexParamForVectorField = IndexParam.builder()
.fieldName("embedding")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.COSINE)
.build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
.dbName("my_database")
.collectionName("test_collection")
.indexParams(indexParams)
.build();
client.createIndex(createIndexReq);
import (
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
// handler err
}
err = indexTask.Await(ctx)
if err != nil {
// handler err
}
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "product_name",
index_type: "AUTOINDEX"
})
client.createIndex({
db_name: "my_database",
collection_name: "test_collection",
field_name: "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
})
export indexParams='[
{
"fieldName": "embedding",
"indexName": "my_vector",
"indexType": "AUTOINDEX"
},
{
"fieldName": "product_name",
"indexName": "my_id",
"indexType": "AUTOINDEX"
}
]'
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"indexParams\": $indexParams
}"
Passo 5: Aggiornare i dati
Una volta che la collezione è pronta, aggiornarla per creare i metadati e gli indici per i dati.
job_id = client.refresh_external_collection(
db_name="my_database",
collection_name="test_collection"
)
while True:
progress = client.get_refresh_external_collection_progress(job_id=job_id)
print(f" {progress.state}: {progress.progress}%")
if progress.state == "RefreshCompleted":
elapsed = progress.end_time - progress.start_time
print(f" Completed in {elapsed}ms")
break
elif progress.state == "RefreshFailed":
print(f" Failed: {progress.reason}")
break
time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;
while (true) {
GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
GetRefreshExternalCollectionProgressReq.builder()
.jobId(jobId)
.build());
RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
if ("RefreshCompleted".equals(jobInfo.getState())) {
long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
System.out.printf(" Refresh completed in %dms%n", elapsed);
break;
} else if ("RefreshFailed".equals(jobInfo.getState())) {
System.out.printf(" Refresh failed: %s%n", jobInfo.getReason());
}
TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
client.NewGetRefreshExternalCollectionProgressOption(jobID))
fmt.Printf("State: %s\n", progress.State)
if progress.State == entity.RefreshStateCompleted {
fmt.Println("Refresh completed!")
break
}
if progress.State == entity.RefreshStateFailed {
fmt.Printf("Refresh failed: %s\n", progress.Reason)
break
}
time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"dbName\": \"my_database\",
\"collectionName\": \"test_collection\",
\"externalSource\": \"volume://my_volume/path/to/a/folder\",
\"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"
L'operazione di refresh è asincrona, quindi è necessario impostare un'iterazione per monitorare i suoi progressi.
L'operazione di aggiornamento analizza i metadati dei file di dati e genera i file manifest di conseguenza. Di solito richiede 150-250 ms.
I file manifest registrano la mappatura tra i metadati di Milvus e le righe dei file esterni.
In caso di aggiornamento dei dati di origine, è necessario richiamare manualmente l'aggiornamento per mantenere Milvus aggiornato.
Un aggiornamento che richiede la rimozione di tutti i metadati attivi senza alcun inserimento comporta un rifiuto.
Aggiornamenti successivi
Una volta aggiornata la collezione esterna, è possibile caricarla e rilasciarla ed eseguire ricerche e query simili nella collezione esterna come in qualsiasi collezione gestita, ad eccezione del fatto che le collezioni in un database per l'elaborazione on-demand devono essere collegate a un cluster on-demand per le ricerche e le query.
Prima di eseguire operazioni DQL, quali ricerca, query, get e ricerca ibrida, è necessario creare una sessione per collegare le risorse di calcolo di un cluster on-demand.