Preparar los datos de origen
En esta página se trata algo que debe tener en cuenta antes de empezar a insertar datos en bloque en su colección.
Antes de empezar
La colección de destino requiere la asignación de los datos de origen a su esquema. El siguiente diagrama muestra cómo se asignan los datos de origen aceptables al esquema de una colección de destino.
Asignación de datos al esquema
Debe examinar cuidadosamente sus datos y diseñar el esquema de la colección de destino en consecuencia.
Tomando como ejemplo los datos JSON del diagrama anterior, hay dos entidades en la lista de filas y cada fila tiene seis campos. El esquema de la colección incluye selectivamente cuatro: id, vector, scalar_1 y scalar_2.
Hay dos cosas más a considerar cuando se diseña el esquema:
Si activar AutoID
El campo id sirve como campo primario de la colección. Para que el campo primario se incremente automáticamente, puede activar AutoID en el esquema. En este caso, debe excluir el campo id de cada fila de los datos de origen.
Habilitar o no campos dinámicos
La colección de destino también puede almacenar campos no incluidos en su esquema predefinido si el esquema habilita los campos dinámicos. El campo $meta es un campo JSON reservado para contener campos dinámicos y sus valores en pares clave-valor. En el diagrama anterior, los campos dynamic_field_1 y dynamic_field_2 y los valores se guardarán como pares clave-valor en el campo $meta.
El siguiente código muestra cómo configurar el esquema para la colección ilustrada en el diagrama anterior.
Para obtener más información, consulte create_schema()
y add_field()
en la referencia del SDK.
Para obtener más información, consulte CollectionSchema
en la referencia del SDK.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
Configuración de BulkWriter
BulkWriter es una herramienta diseñada para convertir conjuntos de datos sin procesar en un formato adecuado para su importación a través de la API de importación RESTful. Ofrece dos tipos de escritores:
- LocalBulkWriter: Lee el conjunto de datos designado y lo transforma en un formato fácil de usar.
- RemoteBulkWriter: Realiza la misma tarea que LocalBulkWriter pero, además, transfiere los archivos de datos convertidos a un bucket de almacenamiento de objetos remoto especificado.
RemoteBulkWriter se diferencia de LocalBulkWriter en que RemoteBulkWriter transfiere los archivos de datos convertidos a un bucket de almacenamiento de objetos de destino.
Configuración de LocalBulkWriter
Un LocalBulkWriter añade filas del conjunto de datos de origen y las consigna en un archivo local del formato especificado.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Al crear un LocalBulkWriter, debe:
- Hacer referencia al esquema creado en
schema
. - Establecer
local_path
como directorio de salida. - Establecer
file_type
como tipo de archivo de salida. - Si el conjunto de datos contiene un gran número de registros, se recomienda segmentar los datos configurando
segment_size
con un valor adecuado.
Para obtener más información sobre la configuración de los parámetros, consulte LocalBulkWriter en la referencia del SDK.
Al crear un LocalBulkWriter, debe:
- Hacer referencia al esquema creado en
CollectionSchema()
. - Establecer el directorio de salida en
withLocalPath()
. - Establecer el tipo de archivo de salida en
withFileType()
. - Si su conjunto de datos contiene un gran número de registros, se recomienda segmentar los datos configurando
withChunkSize()
con un valor adecuado.
Para más detalles sobre la configuración de parámetros, consulte LocalBulkWriter en la referencia SDK.
Configuración de RemoteBulkWriter
En lugar de enviar los datos añadidos a un archivo local, RemoteBulkWriter los envía a un bucket remoto. Por lo tanto, debe configurar un objeto ConnectParam antes de crear un RemoteBulkWriter.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
Una vez que los parámetros de conexión están listos, puede hacer referencia a ellos en el RemoteBulkWriter de la siguiente manera:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
Los parámetros para crear un RemoteBulkWriter son apenas los mismos que para un LocalBulkWriter, excepto connect_param
. Para obtener más información sobre la configuración de los parámetros, consulte RemoteBulkWriter y ConnectParam en la referencia del SDK.
Los parámetros para crear un RemoteBulkWriter son apenas los mismos que los de un LocalBulkWriter, excepto StorageConnectParam
. Para más detalles sobre la configuración de los parámetros, consulte RemoteBulkWriter y StorageConnectParam en la referencia del SDK.
Empezar a escribir
Un BulkWriter tiene dos métodos: append_row()
añade una fila desde un conjunto de datos de origen y commit()
consigna las filas añadidas en un archivo local o en un bucket remoto.
Un BulkWriter tiene dos métodos: appendRow()
añade una fila desde un conjunto de datos de origen y commit()
consigna las filas añadidas en un archivo local o en un bucket remoto.
A modo de demostración, el siguiente código añade datos generados aleatoriamente.
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Dado que el esquema definido permite campos dinámicos, también se pueden incluir campos no definidos por el esquema en los datos a insertar, como se indica a continuación.
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Verificar los resultados
Para comprobar los resultados, puedes obtener la ruta de salida real imprimiendo la propiedad batch_files
del escritor.
Para comprobar los resultados, puede obtener la ruta de salida real imprimiendo el método getBatchFiles()
del escritor.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter genera un UUID, crea una subcarpeta utilizando el UUID en el directorio de salida proporcionado y coloca todos los archivos generados en la subcarpeta. Haga clic aquí para descargar los datos de muestra preparados.
Las posibles estructuras de carpetas son las siguientes
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet