Préparer les données sources
Cette page traite des éléments à prendre en compte avant de commencer à insérer des données en vrac dans votre collection.
Avant de commencer
La collection cible nécessite la mise en correspondance des données sources avec son schéma. Le diagramme ci-dessous montre comment des données sources acceptables sont mises en correspondance avec le schéma d'une collection cible.
Mettre en correspondance les données et le schéma
Vous devez examiner attentivement vos données et concevoir le schéma de la collection cible en conséquence.
Si l'on prend comme exemple les données JSON du diagramme ci-dessus, il y a deux entités dans la liste des lignes, chaque ligne ayant six champs. Le schéma de la collection en inclut sélectivement quatre : id, vector, scalar_1, et scalar_2.
Deux autres éléments doivent être pris en compte lors de la conception du schéma :
L'activation ou non de l'AutoID
Le champ id est le champ primaire de la collection. Pour que le champ primaire s'incrémente automatiquement, vous pouvez activer AutoID dans le schéma. Dans ce cas, vous devez exclure le champ id de chaque ligne des données source.
Activation ou non des champs dynamiques
La collection cible peut également stocker des champs qui ne sont pas inclus dans son schéma prédéfini si ce dernier autorise les champs dynamiques. Le champ $meta est un champ JSON réservé pour contenir les champs dynamiques et leurs valeurs dans des paires clé-valeur. Dans le schéma ci-dessus, les champs dynamic_field_1 et dynamic_field_2 et leurs valeurs seront enregistrés sous forme de paires clé-valeur dans le champ $meta.
Le code suivant montre comment configurer le schéma de la collection illustrée dans le diagramme ci-dessus.
Pour obtenir plus d'informations, reportez-vous à create_schema()
et add_field()
dans la référence du SDK.
Pour obtenir plus d'informations, reportez-vous à CollectionSchema
dans la référence du SDK.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
Configurer BulkWriter
BulkWriter est un outil conçu pour convertir des ensembles de données brutes dans un format adapté à l'importation via l'API d'importation RESTful. Il propose deux types de rédacteurs :
- LocalBulkWriter: Lit l'ensemble de données désigné et le transforme dans un format facile à utiliser.
- RemoteBulkWriter: Effectue la même tâche que LocalBulkWriter, mais transfère en plus les fichiers de données convertis vers un panier de stockage d'objets distant spécifié.
RemoteBulkWriter diffère de LocalBulkWriter en ce que RemoteBulkWriter transfère les fichiers de données convertis vers une unité de stockage d'objets cible.
Configuration de LocalBulkWriter
Un LocalBulkWriter ajoute des lignes au jeu de données source et les enregistre dans un fichier local au format spécifié.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Lors de la création d'un LocalBulkWriter, vous devez :
- faire référence au schéma créé dans
schema
. - Définir
local_path
comme répertoire de sortie. - Définir
file_type
comme type de fichier de sortie. - Si votre jeu de données contient un grand nombre d'enregistrements, il est conseillé de segmenter vos données en fixant
segment_size
à une valeur appropriée.
Pour plus de détails sur le paramétrage, reportez-vous à LocalBulkWriter dans la référence SDK.
Lors de la création d'un LocalBulkWriter, vous devez :
- référencer le schéma créé sur
CollectionSchema()
. - Définir le répertoire de sortie sur
withLocalPath()
. - Définir le type de fichier de sortie sur
withFileType()
. - Si votre jeu de données contient un grand nombre d'enregistrements, il est conseillé de segmenter vos données en fixant
withChunkSize()
à une valeur appropriée.
Pour plus de détails sur le paramétrage, reportez-vous à LocalBulkWriter dans la référence SDK.
Configuration de RemoteBulkWriter
Au lieu d'enregistrer les données ajoutées dans un fichier local, un RemoteBulkWriter les enregistre dans un bucket distant. Par conséquent, vous devez configurer un objet ConnectParam avant de créer un RemoteBulkWriter.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
Une fois que les paramètres de connexion sont prêts, vous pouvez les référencer dans le RemoteBulkWriter comme suit :
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
Les paramètres de création d'un RemoteBulkWriter sont à peu près les mêmes que ceux d'un LocalBulkWriter, à l'exception de connect_param
. Pour plus de détails sur le réglage des paramètres, voir RemoteBulkWriter et ConnectParam dans la référence SDK.
Les paramètres de création d'un RemoteBulkWriter sont à peu près les mêmes que ceux d'un LocalBulkWriter, à l'exception de StorageConnectParam
. Pour plus de détails sur les paramètres, voir RemoteBulkWriter et StorageConnectParam dans la référence du SDK.
Démarrer l'écriture
Un BulkWriter possède deux méthodes : append_row()
ajoute une ligne à partir d'un jeu de données source et commit()
valide les lignes ajoutées dans un fichier local ou un bucket distant.
Un BulkWriter possède deux méthodes : appendRow()
ajoute une ligne à partir d'un jeu de données source et commit()
enregistre les lignes ajoutées dans un fichier local ou un panier distant.
À des fins de démonstration, le code suivant ajoute des données générées de manière aléatoire.
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Étant donné que le schéma défini autorise les champs dynamiques, vous pouvez également inclure des champs non définis par le schéma dans les données à insérer, comme suit.
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Vérifier les résultats
Pour vérifier les résultats, vous pouvez obtenir le chemin de sortie réel en imprimant la propriété batch_files
de l'auteur.
Pour vérifier les résultats, vous pouvez obtenir le chemin de sortie réel en imprimant la méthode getBatchFiles()
du rédacteur.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter génère un UUID, crée un sous-dossier utilisant l'UUID dans le répertoire de sortie fourni et place tous les fichiers générés dans le sous-dossier. Cliquez ici pour télécharger l'exemple de données préparé.
Les structures de dossier possibles sont les suivantes :
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet