Preparare i dati di origine
In questa pagina vengono illustrati alcuni aspetti da considerare prima di iniziare a inserire i dati nella raccolta.
Prima di iniziare
La raccolta di destinazione richiede la mappatura dei dati di origine al suo schema. Il diagramma seguente mostra come i dati di origine accettabili vengono mappati nello schema di una raccolta di destinazione.
Mappare i dati allo schema
È necessario esaminare attentamente i dati e progettare di conseguenza lo schema della raccolta di destinazione.
Prendendo come esempio i dati JSON nel diagramma precedente, ci sono due entità nell'elenco delle righe e ogni riga ha sei campi. Lo schema della collezione ne include selettivamente quattro: id, vector, scalar_1 e scalar_2.
Ci sono altre due cose da considerare quando si progetta lo schema:
Se abilitare l'AutoID
Il campo id è il campo primario della collezione. Per fare in modo che il campo primario si incrementi automaticamente, si può abilitare l'AutoID nello schema. In questo caso, è necessario escludere il campo id da ogni riga dei dati di origine.
Abilitare o meno i campi dinamici
La collezione di destinazione può memorizzare anche campi non inclusi nel suo schema predefinito, se lo schema abilita i campi dinamici. Il campo $meta è un campo JSON riservato per contenere i campi dinamici e i loro valori in coppie chiave-valore. Nello schema precedente, i campi dynamic_field_1 e dynamic_field_2 e i relativi valori saranno salvati come coppie chiave-valore nel campo $meta.
Il codice seguente mostra come impostare lo schema per la collezione illustrata nel diagramma precedente.
Per ottenere maggiori informazioni, fare riferimento a create_schema()
e add_field()
nel riferimento dell'SDK.
Per ottenere maggiori informazioni, fare riferimento a CollectionSchema
nel riferimento dell'SDK.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
Configurazione di BulkWriter
BulkWriter è uno strumento progettato per convertire i set di dati grezzi in un formato adatto all'importazione tramite l'API RESTful Import. Offre due tipi di writer:
- LocalBulkWriter: Legge il set di dati designato e lo trasforma in un formato facile da usare.
- RemoteBulkWriter: Esegue lo stesso compito del LocalBulkWriter, ma in più trasferisce i file di dati convertiti in un bucket di archiviazione degli oggetti remoto specificato.
RemoteBulkWriter si differenzia da LocalBulkWriter per il fatto che RemoteBulkWriter trasferisce i file di dati convertiti a un bucket di archiviazione degli oggetti di destinazione.
Impostazione di LocalBulkWriter
Un LocalBulkWriter aggiunge righe dal dataset di origine e le impegna in un file locale del formato specificato.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Quando si crea un LocalBulkWriter, occorre:
- Fare riferimento allo schema creato in
schema
. - Impostare
local_path
come directory di output. - Impostare
file_type
come tipo di file di output. - Se il set di dati contiene un gran numero di record, si consiglia di segmentare i dati impostando
segment_size
su un valore adeguato.
Per informazioni dettagliate sulle impostazioni dei parametri, consultare LocalBulkWriter nel riferimento dell'SDK.
Quando si crea un LocalBulkWriter, occorre:
- Fare riferimento allo schema creato in
CollectionSchema()
. - Impostare la directory di output in
withLocalPath()
. - Impostare il tipo di file di output in
withFileType()
. - Se il set di dati contiene un gran numero di record, si consiglia di segmentare i dati impostando
withChunkSize()
su un valore adeguato.
Per informazioni dettagliate sulle impostazioni dei parametri, consultare LocalBulkWriter nel riferimento dell'SDK.
Configurazione di RemoteBulkWriter
Invece di eseguire il commit dei dati aggiunti a un file locale, un RemoteBulkWriter li esegue su un bucket remoto. Pertanto, è necessario impostare un oggetto ConnectParam prima di creare un RemoteBulkWriter.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
Una volta che i parametri di connessione sono pronti, si può fare riferimento ad essi nel RemoteBulkWriter come segue:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
I parametri per la creazione di un RemoteBulkWriter sono praticamente gli stessi di quelli di un LocalBulkWriter, tranne connect_param
. Per i dettagli sulle impostazioni dei parametri, consultare RemoteBulkWriter e ConnectParam nel riferimento dell'SDK.
I parametri per la creazione di un RemoteBulkWriter sono quasi identici a quelli di un LocalBulkWriter, tranne che per StorageConnectParam
. Per informazioni dettagliate sulle impostazioni dei parametri, consultare RemoteBulkWriter e StorageConnectParam nel riferimento dell'SDK.
Avvio della scrittura
Un BulkWriter ha due metodi: append_row()
aggiunge una riga da un set di dati di origine e commit()
esegue il commit delle righe aggiunte in un file locale o in un bucket remoto.
Un BulkWriter ha due metodi: appendRow()
aggiunge una riga da un set di dati di origine e commit()
esegue il commit delle righe aggiunte in un file locale o in un bucket remoto.
A scopo dimostrativo, il codice seguente aggiunge dati generati in modo casuale.
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Poiché lo schema definito consente campi dinamici, è possibile includere nei dati da inserire anche campi non definiti dallo schema, come segue.
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Verifica dei risultati
Per verificare i risultati, è possibile ottenere il percorso di output effettivo stampando la proprietà batch_files
del writer.
Per verificare i risultati, è possibile ottenere il percorso di output effettivo stampando il metodo getBatchFiles()
del writer.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter genera un UUID, crea una sottocartella utilizzando l'UUID nella directory di output fornita e colloca tutti i file generati nella sottocartella. Fare clic qui per scaricare i dati di esempio preparati.
Le possibili strutture delle cartelle sono le seguenti:
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet