Milvus 支持导入 Apache Parquet 文件,提高数据处理效率
Milvus是一个高度可扩展的向量数据库,以其处理庞大数据集的能力而闻名,它在2.3.4版本中引入了对Parquet文件的支持,向前迈出了重要的一步。通过采用 Apache Parquet,用户可以简化数据导入流程,并大幅节省存储和计算成本。
在我们的最新文章中,我们探讨了 Parquet 的优势及其给 Milvus 用户带来的好处。我们讨论了集成这一功能背后的动机,并提供了将 Parquet 文件无缝导入 Milvus 的分步指南,为高效数据管理和分析开启了新的可能性。
什么是 Apache Parquet?
Apache Parquet是一种流行的面向列的开源数据文件格式,旨在提高大规模数据集的存储和处理效率。与 CSV 或 JSON 等传统的面向行的数据格式不同,Parquet 按列存储数据,提供更高效的数据压缩和编码方案。这种方法提高了性能,降低了存储要求,增强了处理能力,使其成为批量处理复杂数据的理想选择。
Milvus 用户如何受益于 Parquet 文件导入支持
Milvus 扩展了对 Parquet 文件导入的支持,为用户提供优化的体验和各种优势,包括降低存储和计算费用、精简数据管理和简化导入流程。
优化存储效率和简化数据管理
Parquet 针对不同的数据类型提供灵活的压缩选项和高效的编码方案,确保最佳的存储效率。这种灵活性在云环境中尤为重要,因为在云环境中,每节省一盎司存储空间都直接关系到切实降低成本。有了 Milvus 的这一新功能,用户可以毫不费力地将所有不同的数据整合到一个文件中,从而简化数据管理,提升整体用户体验。这项功能对处理可变长度 Array 数据类型的用户尤其有益,他们现在可以享受简化的数据导入流程。
提高查询性能
Parquet 的列式存储设计和先进的压缩方法大大提高了查询性能。在进行查询时,用户可以只关注相关数据,而无需扫描无关数据。这种有选择性的列读取最大限度地减少了 CPU 占用,从而加快了查询速度。
广泛的语言兼容性
Parquet 支持多种语言,如 Java、C++ 和 Python,并与大量数据处理工具兼容。由于支持 Parquet 文件,使用不同 SDK 的 Milvus 用户可以无缝生成 Parquet 文件,以便在数据库内进行解析。
如何将 Parquet 文件导入 Milvus
如果你的数据已经是 Parquet 文件格式,导入很容易。将 Parquet 文件上传到 MinIO 等对象存储系统,就可以导入了。
下面的代码片段是将 Parquet 文件导入 Milvus 的示例。
remote_files = []
try:
print("Prepare upload files")
minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY,
secure=False)
found = minio_client.bucket_exists(bucket_name)
if not found:
minio_client.make_bucket(bucket_name)
print("MinIO bucket '{}' doesn't exist".format(bucket_name))
return False, []
# set your remote data path
remote_data_path = "milvus_bulkinsert"
def upload_file(f: str):
file_name = os.path.basename(f)
minio_file_path = os.path.join(remote_data_path, "parquet", file_name)
minio_client.fput_object(bucket_name, minio_file_path, f)
print("Upload file '{}' to '{}'".format(f, minio_file_path))
remote_files.append(minio_file_path)
upload_file(data_file)
except S3Error as e:
print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
return False, []
print("Successfully upload files: {}".format(remote_files))
return True, remote_files
如果您的数据不是 Parquet 文件或具有 Dynamic Field,您可以利用我们的数据格式转换工具 BulkWriter 来帮助您生成 Parquet 文件。BulkWriter 现在已将 Parquet 作为其默认输出数据格式,确保为开发人员提供更直观的体验。
下面的代码片段是使用 BulkWriter 生成 Parquet 文件的示例。
import numpy as np
import json
from pymilvus import (
RemoteBulkWriter,
BulkFileType,
)
remote_writer = RemoteBulkWriter(
schema=your_collection_schema,
remote_path="your_remote_data_path",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=YOUR_MINIO_ADDRESS,
access_key=YOUR_MINIO_ACCESS_KEY,
secret_key=YOUR_MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
file_type=BulkFileType.PARQUET,
)
# append your data
batch_count = 10000
for i in range(batch_count):
row = {
"id": i,
"bool": True if i % 5 == 0 else False,
"int8": i % 128,
"int16": i % 1000,
"int32": i % 100000,
"int64": i,
"float": i / 3,
"double": i / 7,
"varchar": f"varchar_{i}",
"json": {"dummy": i, "ok": f"name_{i}"},
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
f"dynamic_{i}": i,
}
remote_writer.append_row(row)
# append rows by numpy type
for i in range(batch_count):
remote_writer.append_row({
"id": np.int64(i + batch_count),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i % 128),
"int16": np.int16(i % 1000),
"int32": np.int32(i % 100000),
"int64": np.int64(i),
"float": np.float32(i / 3),
"double": np.float64(i / 7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
f"dynamic_{i}": i,
})
print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
remote_writer.commit()
print(f"Data files have been uploaded: {remote_writer.batch_files}")
remote_files = remote_writer.batch_files
然后,您就可以开始将 Parquet 文件导入 Milvus。
remote_files = [remote_file_path]
task_id = utility.do_bulk_insert(collection_name=collection_name,
files=remote_files)
task_ids = [task_id]
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
for state in states:
if state.state == BulkInsertState.ImportCompleted:
complete_count = complete_count + 1
现在,您的数据已无缝集成到 Milvus 中。
下一步是什么?
随着 Milvus 继续支持不断增长的数据量,在管理大规模导入方面出现了挑战,特别是当 Parquet 文件超过 10GB 时。为了应对这一挑战,我们计划将导入数据分为标量列和向量列,每次导入创建两个 Parquet 文件,以减轻 I/O 压力。对于超过几百 GB 的数据集,我们建议多次导入数据。
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word