准备源数据
本页讨论的是在开始将数据批量插入数据集之前应该考虑的事项。
开始之前
目标数据集需要将源数据映射到其模式。下图显示了如何将可接受的源数据映射到目标集合的模式。
将数据映射到模式
您应仔细检查数据,并据此设计目标数据集的模式。
以上图中的 JSON 数据为例,行列表中有两个实体,每个行有六个字段。Collection Schema 选择性地包括四个:id、向量、标量_1 和标量_2。
在设计模式时,还有两件事需要考虑:
是否启用自动识别
id字段是集合的主字段。要使主字段自动递增,可以在模式中启用AutoID。在这种情况下,应从源数据的每一行中排除id字段。
是否启用Dynamic Field
如果模式启用了Dynamic Field,目标集合也可以存储未包含在预定义模式中的字段。$meta字段是一个保留的 JSON 字段,用于保存Dynamic Field及其键值对中的值。在上图中,字段dynamic_field_1和dynamic_field_2及其值将作为键值对保存在$meta字段中。
下面的代码展示了如何为上图所示的集合设置模式。
要获取更多信息,请参阅 create_schema()
和 add_field()
以获取更多信息。
要获取更多信息,请参阅 CollectionSchema
以获取更多信息。
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
设置 BulkWriter
BulkWriter是一种工具,用于将原始数据集转换为适合通过 RESTful Import API 导入的格式。它提供两种类型的写入器:
- 本地写入器(LocalBulkWriter):读取指定的数据集,并将其转换为易于使用的格式。
- 远程批量写入器:执行与 LocalBulkWriter 相同的任务,但会将转换后的数据文件额外传输到指定的远程对象存储桶。
RemoteBulkWriter与LocalBulkWriter的不同之处在于,RemoteBulkWriter会将转换后的数据文件传输到目标对象存储桶。
设置 LocalBulkWriter
LocalBulkWriter会追加源数据集中的行,并将其提交到指定格式的本地文件中。
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local*path='.',
segment_size=512 * 1024 \_ 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
创建LocalBulkWriter 时,你应该
- 在
schema
中引用已创建的模式。 - 将
local_path
设置为输出目录。 - 将
file_type
设置为输出文件类型。 - 如果您的数据集包含大量记录,建议您将
segment_size
设置为适当的值,以分割数据。
有关参数设置的详细信息,请参阅 SDK 参考资料中的LocalBulkWriter。
创建LocalBulkWriter 时,应
- 在
CollectionSchema()
中引用已创建的模式。 - 在
withLocalPath()
中设置输出目录。 - 在
withFileType()
中设置输出文件类型。 - 如果您的数据集包含大量记录,建议您通过将
withChunkSize()
设置为适当的值来分割数据。
有关参数设置的详细信息,请参阅 SDK 参考资料中的 LocalBulkWriter。
设置 RemoteBulkWriter
RemoteBulkWriter不会将添加的数据提交到本地文件,而是将它们提交到远程存储桶。因此,在创建RemoteBulkWriter 之前,你应该先设置一个ConnectParam对象。
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
一旦连接参数准备就绪,你就可以在RemoteBulkWriter中引用它,如下所示:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
除了connect_param
之外,创建RemoteBulkWriter 的参数与创建LocalBulkWriter 的参数基本相同。有关参数设置的详细信息,请参阅 SDK 参考资料中的RemoteBulkWriter和ConnectParam。
除StorageConnectParam
外,创建RemoteBulkWriter的参数与创建LocalBulkWriter 的参数基本相同。有关参数设置的详细信息,请参阅 SDK 参考资料中的 RemoteBulkWriter 和 StorageConnectParam。
开始写入
BulkWriter有两个方法:append_row()
从源数据集添加记录,以及commit()
将添加的记录提交到本地文件或远程存储桶。
BulkWriter有两个方法:appendRow()
从源数据集添加行,commit()
将添加的行提交到本地文件或远程数据桶。
为演示起见,下面的代码添加了随机生成的数据。
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append*row({
"id": i,
"vector": [random.uniform(-1, 1) for * in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
由于定义的模式允许Dynamic Field,因此您也可以在要插入的数据中包含非模式定义的字段,如下所示。
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append*row({
"id": i,
"vector":[random.uniform(-1, 1) for * in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
验证结果
要检查结果,可以通过打印写入器的batch_files
属性来获取实际输出路径。
要检查结果,可通过打印写入器的getBatchFiles()
方法获取实际输出路径。
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter会生成一个 UUID,在提供的输出目录中使用 UUID 创建一个子文件夹,并将所有生成的文件放入该子文件夹中。单击此处下载准备好的示例数据。
可能的文件夹结构如下
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet