milvus-logo
LFAI
首页
  • 用户指南

准备源数据

本页讨论的是在开始将数据批量插入数据集之前应该考虑的事项。

开始之前

目标数据集需要将源数据映射到其模式。下图显示了如何将可接受的源数据映射到目标集合的模式。

Map data to schema 将数据映射到模式

您应仔细检查数据,并据此设计目标数据集的模式。

以上图中的 JSON 数据为例,行列表中有两个实体,每个行有六个字段。Collection Schema 选择性地包括四个:id向量标量_1标量_2

在设计模式时,还有两件事需要考虑:

  • 是否启用自动识别

    id字段是集合的主字段。要使主字段自动递增,可以在模式中启用AutoID。在这种情况下,应从源数据的每一行中排除id字段。

  • 是否启用Dynamic Field

    如果模式启用了Dynamic Field,目标集合也可以存储未包含在预定义模式中的字段。$meta字段是一个保留的 JSON 字段,用于保存Dynamic Field及其键值对中的值。在上图中,字段dynamic_field_1dynamic_field_2及其值将作为键值对保存在$meta字段中。

下面的代码展示了如何为上图所示的集合设置模式。

要获取更多信息,请参阅 create_schema()add_field()以获取更多信息。

要获取更多信息,请参阅 CollectionSchema以获取更多信息。

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)

schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;

// Define schema for the target collection
FieldType id = FieldType.newBuilder()
        .withName("id")
        .withDataType(DataType.Int64)
        .withPrimaryKey(true)
        .withAutoID(false)
        .build();

FieldType vector = FieldType.newBuilder()
        .withName("vector")
        .withDataType(DataType.FloatVector)
        .withDimension(768)
        .build();

FieldType scalar1 = FieldType.newBuilder()
        .withName("scalar_1")
        .withDataType(DataType.VarChar)
        .withMaxLength(512)
        .build();

FieldType scalar2 = FieldType.newBuilder()
        .withName("scalar_2")
        .withDataType(DataType.Int64)
        .build();

CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
        .withEnableDynamicField(true)
        .addFieldType(id)
        .addFieldType(vector)
        .addFieldType(scalar1)
        .addFieldType(scalar2)
        .build();

设置 BulkWriter

BulkWriter是一种工具,用于将原始数据集转换为适合通过 RESTful Import API 导入的格式。它提供两种类型的写入器:

  • 本地写入器(LocalBulkWriter):读取指定的数据集,并将其转换为易于使用的格式。
  • 远程批量写入器:执行与 LocalBulkWriter 相同的任务,但会将转换后的数据文件额外传输到指定的远程对象存储桶。

RemoteBulkWriterLocalBulkWriter的不同之处在于,RemoteBulkWriter会将转换后的数据文件传输到目标对象存储桶。

设置 LocalBulkWriter

LocalBulkWriter会追加源数据集中的行,并将其提交到指定格式的本地文件中。

from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = LocalBulkWriter(
schema=schema,
local*path='.',
segment_size=512 * 1024 \_ 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;

LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withLocalPath(".")
    .withChunkSize(512 * 1024 * 1024)
    .withFileType(BulkFileType.PARQUET)
    .build();

LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);

创建LocalBulkWriter 时,你应该

  • schema 中引用已创建的模式。
  • local_path 设置为输出目录。
  • file_type 设置为输出文件类型。
  • 如果您的数据集包含大量记录,建议您将segment_size 设置为适当的值,以分割数据。

有关参数设置的详细信息,请参阅 SDK 参考资料中的LocalBulkWriter

创建LocalBulkWriter 时,应

  • CollectionSchema() 中引用已创建的模式。
  • withLocalPath() 中设置输出目录。
  • withFileType() 中设置输出文件类型。
  • 如果您的数据集包含大量记录,建议您通过将withChunkSize() 设置为适当的值来分割数据。

有关参数设置的详细信息,请参阅 SDK 参考资料中的 LocalBulkWriter。

设置 RemoteBulkWriter

RemoteBulkWriter不会将添加的数据提交到本地文件,而是将它们提交到远程存储桶。因此,在创建RemoteBulkWriter 之前,你应该先设置一个ConnectParam对象。

from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter` 
# when you use pymilvus earlier than 2.4.2 

# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;

String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";

StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
    .withEndpoint(MINIO_URI)
    .withAccessKey(ACCESS_KEY)
    .withSecretKey(SECRET_KEY)
    .withBucketName(BUCKET_NAME)
    .build();

一旦连接参数准备就绪,你就可以在RemoteBulkWriter中引用它,如下所示:

from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;

RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withConnectParam(storageConnectParam)
    .withChunkSize(512 * 1024 * 1024)
    .withRemotePath("/")
    .withFileType(BulkFileType.PARQUET)
    .build();

RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);

除了connect_param 之外,创建RemoteBulkWriter 的参数与创建LocalBulkWriter 的参数基本相同。有关参数设置的详细信息,请参阅 SDK 参考资料中的RemoteBulkWriterConnectParam

StorageConnectParam 外,创建RemoteBulkWriter的参数与创建LocalBulkWriter 的参数基本相同。有关参数设置的详细信息,请参阅 SDK 参考资料中的 RemoteBulkWriter 和 StorageConnectParam。

开始写入

BulkWriter有两个方法:append_row() 从源数据集添加记录,以及commit() 将添加的记录提交到本地文件或远程存储桶。

BulkWriter有两个方法:appendRow() 从源数据集添加行,commit() 将添加的行提交到本地文件或远程数据桶。

为演示起见,下面的代码添加了随机生成的数据。

import random
import string

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
writer.append*row({
"id": i,
"vector": [random.uniform(-1, 1) for * in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})

writer.commit()
import com.alibaba.fastjson.JSONObject;

for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

由于定义的模式允许Dynamic Field,因此您也可以在要插入的数据中包含非模式定义的字段,如下所示。

import random
import string

def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits

    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
writer.append*row({
"id": i,
"vector":[random.uniform(-1, 1) for * in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})

writer.commit()
for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));
    json.put("dynamic_field_1", get_random_boolean());
    json.put("dynamic_field_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

验证结果

要检查结果,可以通过打印写入器的batch_files 属性来获取实际输出路径。

要检查结果,可通过打印写入器的getBatchFiles() 方法获取实际输出路径。

print(writer.batch_files)

# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
#  ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();

// 

// Close the BulkWriter
try {
    localBulkWriter.close();
    remoteBulkWriter.close();            
} catch (Exception e) {
    // TODO: handle exception
    e.printStackTrace();
}

BulkWriter会生成一个 UUID,在提供的输出目录中使用 UUID 创建一个子文件夹,并将所有生成的文件放入该子文件夹中。单击此处下载准备好的示例数据。

可能的文件夹结构如下

# JSON
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.json

# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet

翻译自DeepLogo

反馈

此页对您是否有帮助?