milvus-logo
LFAI
Casa
  • Guida per l'utente

Preparare i dati di origine

In questa pagina vengono illustrati alcuni aspetti da considerare prima di iniziare a inserire i dati nella raccolta.

Prima di iniziare

La raccolta di destinazione richiede la mappatura dei dati di origine al suo schema. Il diagramma seguente mostra come i dati di origine accettabili vengono mappati nello schema di una raccolta di destinazione.

Map data to schema Mappare i dati allo schema

È necessario esaminare attentamente i dati e progettare di conseguenza lo schema della raccolta di destinazione.

Prendendo come esempio i dati JSON nel diagramma precedente, ci sono due entità nell'elenco delle righe e ogni riga ha sei campi. Lo schema della collezione ne include selettivamente quattro: id, vector, scalar_1 e scalar_2.

Ci sono altre due cose da considerare quando si progetta lo schema:

  • Se abilitare l'AutoID

    Il campo id è il campo primario della collezione. Per fare in modo che il campo primario si incrementi automaticamente, si può abilitare l'AutoID nello schema. In questo caso, è necessario escludere il campo id da ogni riga dei dati di origine.

  • Abilitare o meno i campi dinamici

    La collezione di destinazione può memorizzare anche campi non inclusi nel suo schema predefinito, se lo schema abilita i campi dinamici. Il campo $meta è un campo JSON riservato per contenere i campi dinamici e i loro valori in coppie chiave-valore. Nello schema precedente, i campi dynamic_field_1 e dynamic_field_2 e i relativi valori saranno salvati come coppie chiave-valore nel campo $meta.

Il codice seguente mostra come impostare lo schema per la collezione illustrata nel diagramma precedente.

Per ottenere maggiori informazioni, fare riferimento a create_schema() e add_field() nel riferimento dell'SDK.

Per ottenere maggiori informazioni, fare riferimento a CollectionSchema nel riferimento dell'SDK.

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=True
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)

schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;

// Define schema for the target collection
FieldType id = FieldType.newBuilder()
        .withName("id")
        .withDataType(DataType.Int64)
        .withPrimaryKey(true)
        .withAutoID(false)
        .build();

FieldType vector = FieldType.newBuilder()
        .withName("vector")
        .withDataType(DataType.FloatVector)
        .withDimension(768)
        .build();

FieldType scalar1 = FieldType.newBuilder()
        .withName("scalar_1")
        .withDataType(DataType.VarChar)
        .withMaxLength(512)
        .build();

FieldType scalar2 = FieldType.newBuilder()
        .withName("scalar_2")
        .withDataType(DataType.Int64)
        .build();

CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
        .withEnableDynamicField(true)
        .addFieldType(id)
        .addFieldType(vector)
        .addFieldType(scalar1)
        .addFieldType(scalar2)
        .build();

Configurazione di BulkWriter

BulkWriter è uno strumento progettato per convertire i set di dati grezzi in un formato adatto all'importazione tramite l'API RESTful Import. Offre due tipi di writer:

  • LocalBulkWriter: Legge il set di dati designato e lo trasforma in un formato facile da usare.
  • RemoteBulkWriter: Esegue lo stesso compito del LocalBulkWriter, ma in più trasferisce i file di dati convertiti in un bucket di archiviazione degli oggetti remoto specificato.

RemoteBulkWriter si differenzia da LocalBulkWriter per il fatto che RemoteBulkWriter trasferisce i file di dati convertiti a un bucket di archiviazione degli oggetti di destinazione.

Impostazione di LocalBulkWriter

Un LocalBulkWriter aggiunge righe dal dataset di origine e le impegna in un file locale del formato specificato.

from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    segment_size=512 * 1024 * 1024, # Default value
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;

LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withLocalPath(".")
    .withChunkSize(512 * 1024 * 1024)
    .withFileType(BulkFileType.PARQUET)
    .build();

LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);

Quando si crea un LocalBulkWriter, occorre:

  • Fare riferimento allo schema creato in schema.
  • Impostare local_path come directory di output.
  • Impostare file_type come tipo di file di output.
  • Se il set di dati contiene un gran numero di record, si consiglia di segmentare i dati impostando segment_size su un valore adeguato.

Per informazioni dettagliate sulle impostazioni dei parametri, consultare LocalBulkWriter nel riferimento dell'SDK.

Quando si crea un LocalBulkWriter, occorre:

  • Fare riferimento allo schema creato in CollectionSchema().
  • Impostare la directory di output in withLocalPath().
  • Impostare il tipo di file di output in withFileType().
  • Se il set di dati contiene un gran numero di record, si consiglia di segmentare i dati impostando withChunkSize() su un valore adeguato.

Per informazioni dettagliate sulle impostazioni dei parametri, consultare LocalBulkWriter nel riferimento dell'SDK.

Configurazione di RemoteBulkWriter

Invece di eseguire il commit dei dati aggiunti a un file locale, un RemoteBulkWriter li esegue su un bucket remoto. Pertanto, è necessario impostare un oggetto ConnectParam prima di creare un RemoteBulkWriter.

from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter` 
# when you use pymilvus earlier than 2.4.2 

# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="localhost:9000", # the default MinIO service started along with Milvus
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;

String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";

StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
    .withEndpoint(MINIO_URI)
    .withAccessKey(ACCESS_KEY)
    .withSecretKey(SECRET_KEY)
    .withBucketName(BUCKET_NAME)
    .build();

Una volta che i parametri di connessione sono pronti, si può fare riferimento ad essi nel RemoteBulkWriter come segue:

from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;

RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withConnectParam(storageConnectParam)
    .withChunkSize(512 * 1024 * 1024)
    .withRemotePath("/")
    .withFileType(BulkFileType.PARQUET)
    .build();

RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);

I parametri per la creazione di un RemoteBulkWriter sono praticamente gli stessi di quelli di un LocalBulkWriter, tranne connect_param. Per i dettagli sulle impostazioni dei parametri, consultare RemoteBulkWriter e ConnectParam nel riferimento dell'SDK.

I parametri per la creazione di un RemoteBulkWriter sono quasi identici a quelli di un LocalBulkWriter, tranne che per StorageConnectParam. Per informazioni dettagliate sulle impostazioni dei parametri, consultare RemoteBulkWriter e StorageConnectParam nel riferimento dell'SDK.

Avvio della scrittura

Un BulkWriter ha due metodi: append_row() aggiunge una riga da un set di dati di origine e commit() esegue il commit delle righe aggiunte in un file locale o in un bucket remoto.

Un BulkWriter ha due metodi: appendRow() aggiunge una riga da un set di dati di origine e commit() esegue il commit delle righe aggiunte in un file locale o in un bucket remoto.

A scopo dimostrativo, il codice seguente aggiunge dati generati in modo casuale.

import random
import string

def generate_random_str(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector": [random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_str(random.randint(1, 20)),
        "scalar_2": random.randint(0, 100)
    })
    
writer.commit()
import com.alibaba.fastjson.JSONObject;

for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

Poiché lo schema definito consente campi dinamici, è possibile includere nei dati da inserire anche campi non definiti dallo schema, come segue.

import random
import string

def generate_random_string(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector":[random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_string(),
        "scalar_2": random.randint(0, 100),
        "dynamic_field_1": random.choice([True, False]),
        "dynamic_field_2": random.randint(0, 100)
    })
    
writer.commit()
for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));
    json.put("dynamic_field_1", get_random_boolean());
    json.put("dynamic_field_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

Verifica dei risultati

Per verificare i risultati, è possibile ottenere il percorso di output effettivo stampando la proprietà batch_files del writer.

Per verificare i risultati, è possibile ottenere il percorso di output effettivo stampando il metodo getBatchFiles() del writer.

print(writer.batch_files)

# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
#  ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();

// 

// Close the BulkWriter
try {
    localBulkWriter.close();
    remoteBulkWriter.close();            
} catch (Exception e) {
    // TODO: handle exception
    e.printStackTrace();
}

BulkWriter genera un UUID, crea una sottocartella utilizzando l'UUID nella directory di output fornita e colloca tutti i file generati nella sottocartella. Fare clic qui per scaricare i dati di esempio preparati.

Le possibili strutture delle cartelle sono le seguenti:

# JSON
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.json 

# Parquet
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.parquet 

Tradotto daDeepLogo

Feedback

Questa pagina è stata utile?