Erstellen einer externen SammlungCompatible with Milvus 3.0.x

Eine externe Sammlung ist eine Art von Datensammlung in Milvus, die auf Daten aus externen Speichersystemen oder Datenbanktabellen wie AWS S3 und Iceberg zugreift, ohne sie in Milvus zu kopieren. Sie fungiert als Abfrageebene über Data Lakes, wobei die Kompatibilität mit den Milvus-Abfrage-Schnittstellen gewahrt bleibt.

Überblick

In einer typischen KI-Datenpipeline haben Benutzer ihre Daten möglicherweise bereits in Parquet- oder anderen Formaten auf ihrem Speichersystem, wie AWS S3, gespeichert. Damit Milvus diese extern gespeicherten Daten nutzen kann, müssen die Benutzer sie in der Regel mithilfe von ETL-Pipelines (Extract-Transform-Load) in den Milvus-eigenen Speicher importieren.

Dieser Bring-your-data-to-Milvus-Workflow erzeugt redundante Daten, die schwer zu synchronisieren sind, und erhöht den technischen Wartungsaufwand zur Gewährleistung der Datenkonsistenz.

Bring data to compute workflow Bringen Sie Daten zum Compute-Workflow

Um diese Probleme zu lösen, bietet Milvus externe Sammlungen, mit denen Sie von Milvus aus auf Ihre extern gespeicherten Daten zugreifen können, ohne sich um die Datensynchronisation und ETL-Pipelines kümmern zu müssen.

Bring compute to data workflow Bringen Sie Datenverarbeitung in den Daten-Workflow

Sobald eine externe Sammlung erstellt wurde, können Sie direkt auf Ihre Daten zugreifen und sie an demselben Ort aufbewahren, an dem Sie sie speichern. Im Hintergrund erstellt Milvus Manifestdateien, um die Mappings zwischen den Milvus-Metadaten und den Zeilen in den externen Datendateien aufzuzeichnen. Nachdem die Manifestdateien fertig sind, können Sie in der externen Sammlung Indizes erstellen, wie Sie es in jeder verwalteten Sammlung tun würden.

Wenn sich Ihre Daten ändern, werden die Metadaten durch das manuelle Auslösen einer sekundengenauen Aktualisierung aktualisiert, so dass Milvus immer auf dem neuesten Stand ist.

Schritt 1: Schema erstellen

Wie bei der Erstellung einer verwalteten Sammlung müssen Sie auch vor der Erstellung einer externen Sammlung ein Schema erstellen. Das Schema unterscheidet sich jedoch geringfügig von dem einer verwalteten Sammlung.

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema(
    external_source='s3://s3.<region-id>.amazonaws.com/<bucket>/',
    external_spec='{
        "format": "parquet",
        "extfs": {
            ...
        }
    }'
)
import com.google.gson.JsonObject;
import io.milvus.v2.service.collection.request.CreateCollectionReq;

JsonObject externalSpec = new JsonObject();
externalSpec.addProperty("format", "parquet");
externalSpec.add("extfs", new JsonObject());

CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
        .externalSource("s3://s3.<region-id>.amazonaws.com/<bucket>/")
        .externalSpec(externalSpec)
        .build();
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema := entity.NewSchema().
    WithName("product_embeddings").
    WithExternalSource("s3://my-bucket/embeddings/").
    WithExternalSpec(`{"format": "parquet", "extfs": { ... }}`)
// node
export fields='[
        {
            "fieldName": "product_id",
            "dataType": "Int64",
            "isPrimary": true
        },
        {
            "fieldName": "embedding",
            "dataType": "FloatVector",
            "elementTypeParams": {
                "dim": "768"
            }
        },
        {
            "fieldName": "product_name",
            "dataType": "VarChar",
            "elementTypeParams": {
                "max_length": 512
            }
        }
    ]'

Um das Schema für eine externe Sammlung zu erstellen, müssen Sie den Quelldaten-URI, das Datenformat und die Authentifizierungseinstellungen angeben.

Parameter Name

Parameter Beschreibung

Beispiel Wert

format

Format der Zielquelldatendateien.

parquet

snapshot_id

Eine gültige Iceberg-Tabellen-Snapshot-ID. Dieser Parameter gilt nur, wenn Sie format auf iceberg_table setzen.

473984310232959286

extfs

Externe Dateisystemeinstellungen in einer stringifizierten JSON-Struktur.

--

Sie haben die folgenden Optionen, um die Authentifizierungseinstellungen festzulegen:

AWS AK/SK verwenden

Diese Option gilt für selbst gehostetes MinIO oder das Szenario, in dem Sie AK/SK für die Arbeit haben.

{
    "format": "...",
    "extfs": {
        "access_key_id":     "AKIA..",
        "access_key_value":  "u4Lh...",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true",
        "use_virtual_host":  "true"
    }
}

Parameter Name

Parameter Beschreibung

Beispiel Wert

extfs.access_key_id

Zugriffsschlüssel-ID

AKIA...

extfs.access_key_value

Wert des Zugriffsschlüssels

u7LH...

extfs.region

ID der Cloud-Region

us-west-2

extfs.cloud_provider

ID des Cloud-Anbieters

aws

extfs.use_ssl

Ob SSL für den Verbindungsaufbau verwendet wird.

true

extfs.use_virtual_host

Ob virtuelles Hosting für den Zugriff auf Ihren Bucket verwendet werden soll.

Einzelheiten finden Sie in diesem Artikel.

true

AWS IAM verwenden

Diese Option gilt für das Szenario, in dem Milvus auf einer EC2-Instanz oder einem EKS-Cluster läuft. In diesem Fall müssen Sie den AK/SK nicht hart codieren.

{
    "format": "...",
    "extfs": {
        "use_iam":           "true",
        "iam_endpoint":      "https://sts.<region>.amazonaws.com",
        "region":            "us-west-2",
        "cloud_provider":    "aws",
        "use_ssl":           "true"
    }
}

Parameter Name

Parameter Beschreibung

Beispiel Wert

extfs.use_iam

Ob AWS IAM verwendet werden soll.

Setzen Sie dies auf "true" für diese Option.

true

extfs.iam_endpoint

Ein gültiger AWS STS-Endpunkt.

Einzelheiten finden Sie in diesem Artikel.

https:*//*sts.<region>.amazonaws.com

extfs.region

ID der Cloud-Region

us-west-2

extfs.cloud_provider

Cloud-Anbieter-ID

aws

extfs.use_ssl

Ob SSL für den Verbindungsaufbau verwendet wird.

true

Globale Milvus-Anmeldeinformationen verwenden

Diese Option gilt, wenn Sie externe Daten im Milvus-Bucket speichern und die unter milvus.yaml angegebenen globalen MinIO-Einstellungen direkt für den Zugriff auf die Daten verwendet werden können.

{
    "format": "...",
    "extfs": {
        "storage_type": "remote"
    }
}

IAM-Rollen-ARN verwenden

Diese Option gilt, wenn Ihr Unternehmen verschiedene AWS-Konten für die Verwaltung des Milvus-Clusters und des Buckets mit den Zieldatendateien verwendet.

In diesem Fall sollte der Bucket-Eigentümer eine IAM-Rolle erstellen, die

  • AmazonS3FullAccess oder eine feiner abgestufte Richtlinie für den Bucket-Zugriff anfügt.

  • Fügt eine selbst definierte sts:ExternalId in das Bedingungsfeld der Vertrauensrichtlinie der Rolle ein.

Dann sollte der Bucket-Besitzer Ihnen den ARN der IAM-Rolle und die externe ID mitteilen, damit Sie sts:AssumeRole mit diesen Werten aufrufen können, um die IAM-Rolle zu übernehmen.

Nachfolgend finden Sie ein Beispiel für eine Berechtigungsrichtlinie, die der IAM-Rolle mit den zulässigen Berechtigungen zugeordnet wird. Sie können diese an Ihre Anforderungen anpassen.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::SOURCE-DATA-BUCKET/*"
        }
    ]
}

Die mit der IAM-Rolle verbundene Vertrauensrichtlinie legt fest, wer die Rolle übernehmen darf.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_RUNNING_MILVUS:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "YOUR_UNIQUE_EXTERNAL_ID"
        }
      }
    }
  ]
}

Sobald Sie den ARN der IAM-Rolle und die externe ID erhalten haben, können Sie den Parameter external_spec wie folgt einrichten:

{
    "format": "...",
    "extfs": {
        "cloud_provider": "aws",
        "region": "us-west-2",
        "storage_type": "remote",
        "use_ssl": "true",
        "use_iam": "true",
        "role_arn": "arn:aws:iam::306787000000:role/lentitude-bucket-role",
        "external_id": "YOUR_UNIQUE_EXTERNAL_ID",
        "load_frequency": "900"
    }
}

Parameter Name

Parameter Beschreibung

Beispielwert

extfs.cloud_provider

Cloud-Anbieter-ID

aws

extfs.region

ID der Cloud-Region

us-west-2

extfs.use_ssl

Ob SSL für den Verbindungsaufbau verwendet wird.

true

extfs.use_iam

Ob AWS IAM verwendet werden soll.

Setzen Sie dies auf "true" für diese Option.

true

extfs.role_arn

IAM Role ARN, die vom Bucket-Besitzer erhalten wird.

arn:aws:iam::306787000000:role/...

extfs.external_id

Externe ID, die Sie vom Bucket-Besitzer erhalten.

--

extfs.load_frequency

Intervall, in dem Milvus temporäre Authentifizierungsnachweise in Sekunden abruft.

900

Schritt 2: Felder hinzufügen

Sobald das Schema fertig ist, können Sie Felder wie folgt hinzufügen:

schema.add_field(
    field_name="product_id",
    datatype=DataType.INT64,
    # highlight-next
    external_field="id" # field name in the external data file
)
schema.add_field(
    field_name="product_name",
    datatype=DataType.VARCHAR,
    max_length=512,
    # highlight-next
    external_field="name"
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    # highlight-next
    external_field="vector"
)
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;

schema.addField(AddFieldReq.builder()
        .fieldName("product_id")
        .dataType(DataType.Int64)
        .externalField("id")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("product_name")
        .dataType(DataType.VarChar)
        .maxLength(512)
        .externalField("name")
        .build());
schema.addField(AddFieldReq.builder()
        .fieldName("embedding")
        .dataType(DataType.FloatVector)
        .dimension(768)
        .externalField("vector")
        .build());
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

schema = schema.
    WithField(
        entity.NewField().
            WithName("product_id").
            WithDataType(entity.FieldTypeInt64).
            WithExternalField("id"),
    ).
    WithField(
        entity.NewField().
            WithName("product_name").
            WithDataType(entity.FieldTypeVarChar).
            WithMaxLength(512).
            WithExternalField("name"),
    ).
    WithField(
        entity.NewField().
            WithName("embedding").
            WithDataType(entity.FieldTypeFloatVector).
            WithDim(768).
            WithExternalField("vector"),
    )
// node
export schema="{
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\",
    \"fields\": $fields
}"

Schritt 3: Erstellen einer Sammlung

Nachdem Sie alle Felder zum Schema hinzugefügt haben, können Sie die externe Sammlung erstellen.

client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

client.create_collection(
    collection_name="test_collection",
    schema=schema
)
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

ConnectConfig connectConfig = ConnectConfig.builder()
        .uri("http://localhost:19530")
        .token("root:Milvus")
        .build();

MilvusClientV2 client = new MilvusClientV2(connectConfig);

CreateCollectionReq createReq = CreateCollectionReq.builder()
        .collectionName("test_collection")
        .collectionSchema(schema)
        .build();
client.createCollection(createReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    client "github.com/milvus-io/milvus/client/v2/milvusclient"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "http://localhost:19530"
token := "root:Milvus"

client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
    APIKey: token
})

err = client.CreateCollection(ctx, milvusclient.NewCreateCollectionOption("test_collection", schema))

if err != nil {
    fmt.Println(err.Error())
    // handle error
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"schema\": $schema
}"

Schritt 4: Indizes erstellen

Für die Spalten der externen Sammlung können Sie wie bei verwalteten Sammlungen Indizes erstellen.

index_params = client.prepare_index_params()
# Add indexes
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",
    metric_type="COSINE"
)
index_params.add_index(
    field_name="product_name",
    index_type="AUTOINDEX"
)
client.create_index(
    db_name="my_database",
    collection_name="test_collection",
    index_params=index_params
)
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.index.request.CreateIndexReq;
import java.util.*;

IndexParam indexParamForIdField = IndexParam.builder()
        .fieldName("product_name")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .build();
IndexParam indexParamForVectorField = IndexParam.builder()
        .fieldName("embedding")
        .indexType(IndexParam.IndexType.AUTOINDEX)
        .metricType(IndexParam.MetricType.COSINE)
        .build();
List<IndexParam> indexParams = new ArrayList<>();
indexParams.add(indexParamForIdField);
indexParams.add(indexParamForVectorField);
CreateIndexReq createIndexReq = CreateIndexReq.builder()
        .dbName("my_database")
        .collectionName("test_collection")
        .indexParams(indexParams)
        .build();
client.createIndex(createIndexReq);
import (
    "github.com/milvus-io/milvus/client/v2/entity"
    "github.com/milvus-io/milvus/client/v2/index"
    "github.com/milvus-io/milvus/client/v2/milvusclient"
)

collectionName := "test_collection"
indexOptions := []milvusclient.CreateIndexOption{
    milvusclient.NewCreateIndexOption(collectionName, "embedding", index.NewAutoIndex(entity.COSINE)),
    milvusclient.NewCreateIndexOption(collectionName, "product_name", index.NewAutoIndex(index.AUTOINDEX)),
}
indexTask, err := client.CreateIndex(ctx, indexOptions)
if err != nil {
    // handler err
}
err = indexTask.Await(ctx)
if err != nil {
    // handler err
}
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "product_name",
    index_type: "AUTOINDEX"
})
client.createIndex({
    db_name: "my_database",
    collection_name: "test_collection",
    field_name: "embedding",
    index_type: "AUTOINDEX",
    metric_type: "COSINE"
})
export indexParams='[
        {
            "fieldName": "embedding",
            "indexName": "my_vector",
            "indexType": "AUTOINDEX"
        },
        {
            "fieldName": "product_name",
            "indexName": "my_id",
            "indexType": "AUTOINDEX"
        }
    ]'

curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/indexes/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"indexParams\": $indexParams
}"

Schritt 5: Daten auffrischen

Sobald die Sammlung fertig ist, aktualisieren Sie sie, um die Metadaten und Indizes für Ihre Daten zu erstellen.

job_id = client.refresh_external_collection(
    db_name="my_database",
    collection_name="test_collection"
)
while True:
    progress = client.get_refresh_external_collection_progress(job_id=job_id)
    print(f"  {progress.state}: {progress.progress}%")
    if progress.state == "RefreshCompleted":
        elapsed = progress.end_time - progress.start_time
        print(f"  Completed in {elapsed}ms")
        break
    elif progress.state == "RefreshFailed":
        print(f"  Failed: {progress.reason}")
        break
    time.sleep(2)
import io.milvus.v2.service.utility.request.GetRefreshExternalCollectionProgressReq;
import io.milvus.v2.service.utility.request.ListRefreshExternalCollectionJobsReq;
import io.milvus.v2.service.utility.request.RefreshExternalCollectionReq;
import io.milvus.v2.service.utility.response.GetRefreshExternalCollectionProgressResp;
import io.milvus.v2.service.utility.response.ListRefreshExternalCollectionJobsResp;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionJobInfo;
import io.milvus.v2.service.utility.response.RefreshExternalCollectionResp;

while (true) {
    GetRefreshExternalCollectionProgressResp resp = client.getRefreshExternalCollectionProgress(
            GetRefreshExternalCollectionProgressReq.builder()
                    .jobId(jobId)
                    .build());
    RefreshExternalCollectionJobInfo jobInfo = resp.getJobInfo();
    if ("RefreshCompleted".equals(jobInfo.getState())) {
        long elapsed = jobInfo.getEndTime() - jobInfo.getStartTime();
        System.out.printf("  Refresh completed in %dms%n", elapsed);
        break;
    } else if ("RefreshFailed".equals(jobInfo.getState())) {
        System.out.printf("  Refresh failed: %s%n", jobInfo.getReason());
    }
    TimeUnit.SECONDS.sleep(2);
}
refreshResult, err := client.RefreshExternalCollection(ctx,
    client.NewRefreshExternalCollectionOption("test_collection"))
jobID := refreshResult.JobID
for {
    progress, _ := client.GetRefreshExternalCollectionProgress(ctx,
        client.NewGetRefreshExternalCollectionProgressOption(jobID))
    fmt.Printf("State: %s\n", progress.State)
    if progress.State == entity.RefreshStateCompleted {
        fmt.Println("Refresh completed!")
        break
    }
    if progress.State == entity.RefreshStateFailed {
        fmt.Printf("Refresh failed: %s\n", progress.Reason)
        break
    }
    time.Sleep(2 * time.Second)
}
// node
curl --request POST \
--url "${PROJECT_ENDPOINT}/v2/vectordb/jobs/external_collection/refresh" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
    \"dbName\": \"my_database\",
    \"collectionName\": \"test_collection\",
    \"externalSource\": \"volume://my_volume/path/to/a/folder\",
    \"externalSpec\": \"{\\\"format\\\": \\\"parquet\\\"}\"
}"

Der Aktualisierungsvorgang ist asynchron, daher müssen Sie eine Iteration einrichten, um den Fortschritt zu überwachen.

  • Der Aktualisierungsvorgang scannt die Metadaten der Datendateien und erzeugt die entsprechenden Manifestdateien. Er dauert in der Regel 150-250 ms.

  • In den Manifestdateien wird die Zuordnung zwischen den Metadaten in Milvus und den Zeilen in den externen Dateien aufgezeichnet.

  • Wenn Ihre Quelldaten aktualisiert werden, müssen Sie die Aktualisierung manuell erneut aufrufen, um Milvus auf dem neuesten Stand zu halten.

  • Eine Aktualisierung, bei der alle aktiven Metadaten entfernt werden müssen, ohne dass etwas eingefügt wird, führt zu einer Verweigerung.

Folgemaßnahmen

Sobald Sie die externe Sammlung aktualisiert haben, können Sie die Sammlung laden und freigeben und Ähnlichkeitssuchen und -abfragen in der externen Sammlung durchführen, wie Sie es in jeder verwalteten Sammlung tun würden, mit der Ausnahme, dass Sammlungen in einer Datenbank für On-Demand-Computing mit einem On-Demand-Cluster für Suchen und Abfragen verbunden sein müssen.

Bevor Sie DQL-Vorgänge wie Suche, Abfrage, Get und hybride Suche durchführen, müssen Sie eine Sitzung erstellen, um die Rechenressourcen eines On-Demand-Clusters zuzuordnen.