Préparer les données sources
Cette page traite des éléments à prendre en compte avant de commencer à insérer des données en vrac dans votre collection.
Avant de commencer
La collection cible nécessite la mise en correspondance des données sources avec son schéma. Le diagramme ci-dessous montre comment des données sources acceptables sont mises en correspondance avec le schéma d'une collection cible.
Mettre en correspondance les données et le schéma
Vous devez examiner attentivement vos données et concevoir le schéma de la collection cible en conséquence.
Si l'on prend comme exemple les données JSON du diagramme ci-dessus, il y a deux entités dans la liste des lignes, chaque ligne ayant six champs. Le schéma de la collection en inclut sélectivement quatre : id, vector, scalar_1, et scalar_2.
Deux autres éléments doivent être pris en compte lors de la conception du schéma :
L'activation ou non de l'AutoID
Le champ id est le champ primaire de la collection. Pour que le champ primaire s'incrémente automatiquement, vous pouvez activer AutoID dans le schéma. Dans ce cas, vous devez exclure le champ id de chaque ligne des données source.
Activation ou non des champs dynamiques
La collection cible peut également stocker des champs qui ne sont pas inclus dans son schéma prédéfini si ce dernier autorise les champs dynamiques. Le champ $meta est un champ JSON réservé pour contenir les champs dynamiques et leurs valeurs dans des paires clé-valeur. Dans le schéma ci-dessus, les champs dynamic_field_1 et dynamic_field_2 et leurs valeurs seront enregistrés sous forme de paires clé-valeur dans le champ $meta.
Le code suivant montre comment configurer le schéma de la collection illustrée dans le diagramme ci-dessus.
Pour obtenir plus d'informations, reportez-vous à create_schema()
et add_field()
dans la référence du SDK.
Pour obtenir plus d'informations, reportez-vous à CollectionSchema
dans la référence du SDK.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
DIM = 512
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
schema.verify()
print(schema)
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.BulkImport;
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.common.clientenum.CloudStorage;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.request.describe.MilvusDescribeImportRequest;
import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
import io.milvus.common.utils.Float16Utils;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
private static final String MINIO_ENDPOINT = CloudStorage.MINIO.getEndpoint("http://127.0.0.1:9000");
private static final String BUCKET_NAME = "a-bucket";
private static final String ACCESS_KEY = "minioadmin";
private static final String SECRET_KEY = "minioadmin";
private static final Integer DIM = 512;
private static final Gson GSON_INSTANCE = new Gson();
private static CreateCollectionReq.CollectionSchema createSchema() {
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.enableDynamicField(true)
.build();
schema.addField(AddFieldReq.builder()
.fieldName("id")
.dataType(io.milvus.v2.common.DataType.Int64)
.isPrimaryKey(Boolean.TRUE)
.autoID(false)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("bool")
.dataType(DataType.Bool)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int8")
.dataType(DataType.Int8)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int16")
.dataType(DataType.Int16)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int32")
.dataType(DataType.Int32)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int64")
.dataType(DataType.Int64)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float")
.dataType(DataType.Float)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("double")
.dataType(DataType.Double)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("varchar")
.dataType(DataType.VarChar)
.maxLength(512)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("json")
.dataType(io.milvus.v2.common.DataType.JSON)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("array_int")
.dataType(io.milvus.v2.common.DataType.Array)
.maxCapacity(100)
.elementType(io.milvus.v2.common.DataType.Int64)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("array_str")
.dataType(io.milvus.v2.common.DataType.Array)
.maxCapacity(100)
.elementType(io.milvus.v2.common.DataType.VarChar)
.maxLength(128)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float_vector")
.dataType(io.milvus.v2.common.DataType.FloatVector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("binary_vector")
.dataType(io.milvus.v2.common.DataType.BinaryVector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float16_vector")
.dataType(io.milvus.v2.common.DataType.Float16Vector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("sparse_vector")
.dataType(io.milvus.v2.common.DataType.SparseFloatVector)
.build());
return schema;
}
Configurer BulkWriter
BulkWriter est un outil conçu pour convertir des ensembles de données brutes dans un format adapté à l'importation via l'API d'importation RESTful. Il propose deux types de rédacteurs :
- LocalBulkWriter: Lit l'ensemble de données désigné et le transforme dans un format facile à utiliser.
- RemoteBulkWriter: Effectue la même tâche que LocalBulkWriter, mais transfère en plus les fichiers de données convertis vers un panier de stockage d'objets distant spécifié.
RemoteBulkWriter diffère de LocalBulkWriter en ce que RemoteBulkWriter transfère les fichiers de données convertis vers une unité de stockage d'objets cible.
Configuration de LocalBulkWriter
Un LocalBulkWriter ajoute des lignes au jeu de données source et les enregistre dans un fichier local au format spécifié.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Lors de la création d'un LocalBulkWriter, vous devez :
- Faire référence au schéma créé dans
schema
. - Définir
local_path
comme répertoire de sortie. - Définir
file_type
comme type de fichier de sortie. - Si votre jeu de données contient un grand nombre d'enregistrements, il est conseillé de segmenter vos données en fixant
segment_size
à une valeur appropriée.
Pour plus de détails sur le paramétrage, reportez-vous à LocalBulkWriter dans la référence SDK.
Lors de la création d'un LocalBulkWriter, vous devez :
- référencer le schéma créé sur
CollectionSchema()
. - Définir le répertoire de sortie sur
withLocalPath()
. - Définir le type de fichier de sortie sur
withFileType()
. - Si votre jeu de données contient un grand nombre d'enregistrements, il est conseillé de segmenter vos données en fixant
withChunkSize()
à une valeur appropriée.
Pour plus de détails sur le paramétrage, reportez-vous à LocalBulkWriter dans la référence SDK.
Configuration de RemoteBulkWriter
Au lieu d'enregistrer les données ajoutées dans un fichier local, un RemoteBulkWriter les enregistre dans un panier distant. Par conséquent, vous devez configurer un objet ConnectParam avant de créer un RemoteBulkWriter.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="a-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
print('bulk writer created.')
private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException {
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_ENDPOINT)
.withBucketName(BUCKET_NAME)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("/")
.withConnectParam(connectParam)
.withFileType(BulkFileType.PARQUET)
.build();
return new RemoteBulkWriter(bulkWriterParam);
}
Une fois que les paramètres de connexion sont prêts, vous pouvez les référencer dans le RemoteBulkWriter comme suit :
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
Les paramètres de création d'un RemoteBulkWriter sont à peu près les mêmes que ceux d'un LocalBulkWriter, à l'exception de connect_param
. Pour plus de détails sur le réglage des paramètres, voir RemoteBulkWriter et ConnectParam dans la référence SDK.
Les paramètres de création d'un RemoteBulkWriter sont à peu près les mêmes que ceux d'un LocalBulkWriter, à l'exception de StorageConnectParam
. Pour plus de détails sur les paramètres, voir RemoteBulkWriter et StorageConnectParam dans la référence du SDK.
Démarrer l'écriture
Un BulkWriter possède deux méthodes : append_row()
ajoute une ligne à partir d'un jeu de données source et commit()
valide les lignes ajoutées dans un fichier local ou un bucket distant.
Un BulkWriter possède deux méthodes : appendRow()
ajoute une ligne à partir d'un jeu de données source et commit()
enregistre les lignes ajoutées dans un fichier local ou un panier distant.
À des fins de démonstration, le code suivant ajoute des données générées de manière aléatoire.
import random, string, json
import numpy as np
import tensorflow as tf
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector
# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector
# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector
# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector
# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector
for i in range(10000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')
print(writer.batch_files)
private static byte[] genBinaryVector() {
Random ran = new Random();
int byteCount = DIM / 8;
ByteBuffer vector = ByteBuffer.allocate(byteCount);
for (int i = 0; i < byteCount; ++i) {
vector.put((byte) ran.nextInt(Byte.MAX_VALUE));
}
return vector.array();
}
private static List<Float> genFloatVector() {
Random ran = new Random();
List<Float> vector = new ArrayList<>();
for (int i = 0; i < DIM; ++i) {
vector.add(ran.nextFloat());
}
return vector;
}
private static byte[] genFloat16Vector() {
List<Float> originalVector = genFloatVector();
return Float16Utils.f32VectorToFp16Buffer(originalVector).array();
}
private static SortedMap<Long, Float> genSparseVector() {
Random ran = new Random();
SortedMap<Long, Float> sparse = new TreeMap<>();
int dim = ran.nextInt(18) + 2; // [2, 20)
for (int i = 0; i < dim; ++i) {
sparse.put((long)ran.nextInt(1000000), ran.nextFloat());
}
return sparse;
}
private static List<String> genStringArray(int length) {
List<String> arr = new ArrayList<>();
for (int i = 0; i < length; i++) {
arr.add("str_" + i);
}
return arr;
}
private static List<Long> genIntArray(int length) {
List<Long> arr = new ArrayList<>();
for (long i = 0; i < length; i++) {
arr.add(i);
}
return arr;
}
private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException {
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_ENDPOINT)
.withBucketName(BUCKET_NAME)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("/")
.withConnectParam(connectParam)
.withFileType(BulkFileType.PARQUET)
.build();
return new RemoteBulkWriter(bulkWriterParam);
}
private static List<List<String>> uploadData() throws Exception {
CreateCollectionReq.CollectionSchema collectionSchema = createSchema();
try (RemoteBulkWriter remoteBulkWriter = createRemoteBulkWriter(collectionSchema)) {
for (int i = 0; i < 10000; ++i) {
JsonObject rowObject = new JsonObject();
rowObject.addProperty("id", i);
rowObject.addProperty("bool", i % 3 == 0);
rowObject.addProperty("int8", i % 128);
rowObject.addProperty("int16", i % 1000);
rowObject.addProperty("int32", i % 100000);
rowObject.addProperty("int64", i);
rowObject.addProperty("float", i / 3);
rowObject.addProperty("double", i / 7);
rowObject.addProperty("varchar", "varchar_" + i);
rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
rowObject.add("array_str", GSON_INSTANCE.toJsonTree(genStringArray(5)));
rowObject.add("array_int", GSON_INSTANCE.toJsonTree(genIntArray(10)));
rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(genFloatVector()));
rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(genBinaryVector()));
rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(genFloat16Vector()));
rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(genSparseVector()));
rowObject.addProperty("dynamic", "dynamic_" + i);
remoteBulkWriter.appendRow(rowObject);
if ((i+1)%1000 == 0) {
remoteBulkWriter.commit(false);
}
}
List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
System.out.println(batchFiles);
return batchFiles;
} catch (Exception e) {
throw e;
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
}
Vérifier les résultats
Pour vérifier les résultats, vous pouvez obtenir le chemin de sortie réel en imprimant la propriété batch_files
de l'auteur.
Pour vérifier les résultats, vous pouvez obtenir le chemin de sortie réel en imprimant la méthode getBatchFiles()
de l'auteur.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter génère un UUID, crée un sous-dossier utilisant l'UUID dans le répertoire de sortie fourni et place tous les fichiers générés dans le sous-dossier. Cliquez ici pour télécharger l'exemple de données préparé.
Les structures de dossier possibles sont les suivantes :
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet