🚀 免费试用 Zilliz Cloud,完全托管的 Milvus,体验 10 倍的性能提升!立即试用>

milvus-logo
LFAI
  • Home
  • Blog
  • Milvus 支持导入 Apache Parquet 文件,提高数据处理效率

Milvus 支持导入 Apache Parquet 文件,提高数据处理效率

  • Engineering
March 08, 2024
Cai Zhang, Fendy Feng

Milvus是一个高度可扩展的向量数据库,以其处理庞大数据集的能力而闻名,它在2.3.4版本中引入了对Parquet文件的支持,向前迈出了重要的一步。通过采用 Apache Parquet,用户可以简化数据导入流程,并大幅节省存储和计算成本。

在我们的最新文章中,我们探讨了 Parquet 的优势及其给 Milvus 用户带来的好处。我们讨论了集成这一功能背后的动机,并提供了将 Parquet 文件无缝导入 Milvus 的分步指南,为高效数据管理和分析开启了新的可能性。

什么是 Apache Parquet?

Apache Parquet是一种流行的面向列的开源数据文件格式,旨在提高大规模数据集的存储和处理效率。与 CSV 或 JSON 等传统的面向行的数据格式不同,Parquet 按列存储数据,提供更高效的数据压缩和编码方案。这种方法提高了性能,降低了存储要求,增强了处理能力,使其成为批量处理复杂数据的理想选择。

Milvus 用户如何受益于 Parquet 文件导入支持

Milvus 扩展了对 Parquet 文件导入的支持,为用户提供优化的体验和各种优势,包括降低存储和计算费用、精简数据管理和简化导入流程。

优化存储效率和简化数据管理

Parquet 针对不同的数据类型提供灵活的压缩选项和高效的编码方案,确保最佳的存储效率。这种灵活性在云环境中尤为重要,因为在云环境中,每节省一盎司存储空间都直接关系到切实降低成本。有了 Milvus 的这一新功能,用户可以毫不费力地将所有不同的数据整合到一个文件中,从而简化数据管理,提升整体用户体验。这项功能对处理可变长度 Array 数据类型的用户尤其有益,他们现在可以享受简化的数据导入流程。

提高查询性能

Parquet 的列式存储设计和先进的压缩方法大大提高了查询性能。在进行查询时,用户可以只关注相关数据,而无需扫描无关数据。这种有选择性的列读取最大限度地减少了 CPU 占用,从而加快了查询速度。

广泛的语言兼容性

Parquet 支持多种语言,如 Java、C++ 和 Python,并与大量数据处理工具兼容。由于支持 Parquet 文件,使用不同 SDK 的 Milvus 用户可以无缝生成 Parquet 文件,以便在数据库内进行解析。

如何将 Parquet 文件导入 Milvus

如果你的数据已经是 Parquet 文件格式,导入很容易。将 Parquet 文件上传到 MinIO 等对象存储系统,就可以导入了。

下面的代码片段是将 Parquet 文件导入 Milvus 的示例。

remote_files = []
try:
    print("Prepare upload files")
    minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY,
                         secure=False)
    found = minio_client.bucket_exists(bucket_name)
    if not found:
        minio_client.make_bucket(bucket_name)
        print("MinIO bucket '{}' doesn't exist".format(bucket_name))
        return False, []

    # set your remote data path
    remote_data_path = "milvus_bulkinsert"

    def upload_file(f: str):
        file_name = os.path.basename(f)
        minio_file_path = os.path.join(remote_data_path, "parquet", file_name)
        minio_client.fput_object(bucket_name, minio_file_path, f)
        print("Upload file '{}' to '{}'".format(f, minio_file_path))
        remote_files.append(minio_file_path)

    upload_file(data_file)

except S3Error as e:
    print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
    return False, []

print("Successfully upload files: {}".format(remote_files))
return True, remote_files

如果您的数据不是 Parquet 文件或具有 Dynamic Field,您可以利用我们的数据格式转换工具 BulkWriter 来帮助您生成 Parquet 文件。BulkWriter 现在已将 Parquet 作为其默认输出数据格式,确保为开发人员提供更直观的体验。

下面的代码片段是使用 BulkWriter 生成 Parquet 文件的示例。

import numpy as np
import json

from pymilvus import (
    RemoteBulkWriter,
    BulkFileType,
)

remote_writer = RemoteBulkWriter(
        schema=your_collection_schema,
        remote_path="your_remote_data_path",
        connect_param=RemoteBulkWriter.ConnectParam(
            endpoint=YOUR_MINIO_ADDRESS,
            access_key=YOUR_MINIO_ACCESS_KEY,
            secret_key=YOUR_MINIO_SECRET_KEY,
            bucket_name="a-bucket",
        ),
        file_type=BulkFileType.PARQUET,
)

# append your data
batch_count = 10000
for i in range(batch_count):
    row = {
        "id": i,
        "bool": True if i % 5 == 0 else False,
        "int8": i % 128,
        "int16": i % 1000,
        "int32": i % 100000,
        "int64": i,
        "float": i / 3,
        "double": i / 7,
        "varchar": f"varchar_{i}",
        "json": {"dummy": i, "ok": f"name_{i}"},
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    }
    remote_writer.append_row(row)

# append rows by numpy type
for i in range(batch_count):
    remote_writer.append_row({
        "id": np.int64(i + batch_count),
        "bool": True if i % 3 == 0 else False,
        "int8": np.int8(i % 128),
        "int16": np.int16(i % 1000),
        "int32": np.int32(i % 100000),
        "int64": np.int64(i),
        "float": np.float32(i / 3),
        "double": np.float64(i / 7),
        "varchar": f"varchar_{i}",
        "json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    })

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
remote_writer.commit()
print(f"Data files have been uploaded: {remote_writer.batch_files}")
remote_files = remote_writer.batch_files

然后,您就可以开始将 Parquet 文件导入 Milvus。

remote_files = [remote_file_path]
task_id = utility.do_bulk_insert(collection_name=collection_name,
                                 files=remote_files)

task_ids = [task_id]         
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
for state in states:
    if state.state == BulkInsertState.ImportCompleted:
        complete_count = complete_count + 1

现在,您的数据已无缝集成到 Milvus 中。

下一步是什么?

随着 Milvus 继续支持不断增长的数据量,在管理大规模导入方面出现了挑战,特别是当 Parquet 文件超过 10GB 时。为了应对这一挑战,我们计划将导入数据分为标量列和向量列,每次导入创建两个 Parquet 文件,以减轻 I/O 压力。对于超过几百 GB 的数据集,我们建议多次导入数据。

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started

Like the article? Spread the word

扩展阅读