🚀 완전 관리형 Milvus인 Zilliz Cloud를 무료로 체험해보세요—10배 더 빠른 성능을 경험하세요! 지금 체험하기>>

milvus-logo
LFAI
  • Home
  • Blog
  • Milvus, 데이터 처리 효율성 향상을 위한 Apache Parquet 파일 가져오기 지원

Milvus, 데이터 처리 효율성 향상을 위한 Apache Parquet 파일 가져오기 지원

  • Engineering
March 08, 2024
Cai Zhang, Fendy Feng

방대한 데이터 세트를 처리하는 능력으로 유명한 확장성이 뛰어난 벡터 데이터베이스인Milvus는 버전 2.3.4에서 Parquet 파일 지원을 도입하여 한 단계 더 발전했습니다. Apache Parquet을 도입함으로써 사용자는 데이터 가져오기 프로세스를 간소화하고 스토리지 및 계산 비용을 크게 절감할 수 있습니다.

최신 게시물에서는 Parquet의 장점과 Milvus 사용자에게 제공되는 이점을 살펴봅니다. 이 기능을 통합하게 된 동기를 설명하고, Parquet 파일을 Milvus로 원활하게 가져와 효율적인 데이터 관리 및 분석을 위한 새로운 가능성을 열어주는 단계별 가이드를 제공합니다.

Apache Parquet이란?

Apache Parquet은 대규모 데이터 세트의 저장 및 처리 효율성을 높이기 위해 설계된 널리 사용되는 오픈 소스 열 지향 데이터 파일 형식입니다. CSV나 JSON과 같은 기존의 행 중심 데이터 형식과 달리 Parquet은 데이터를 열 단위로 저장하여 보다 효율적인 데이터 압축 및 인코딩 체계를 제공합니다. 이러한 접근 방식은 성능 향상, 스토리지 요구 사항 감소, 처리 능력 향상으로 이어져 복잡한 데이터를 대량으로 처리하는 데 이상적입니다.

Milvus 사용자가 Parquet 파일 가져오기 지원의 혜택을 누리는 방법

Milvus는 Parquet 파일 가져오기 지원을 확장하여 사용자에게 스토리지 및 컴퓨팅 비용 절감, 데이터 관리 간소화, 가져오기 프로세스 간소화 등 최적화된 환경과 다양한 이점을 제공합니다.

최적화된 스토리지 효율성 및 간소화된 데이터 관리

Parquet은 다양한 데이터 유형에 맞는 유연한 압축 옵션과 효율적인 인코딩 체계를 제공하여 최적의 스토리지 효율성을 보장합니다. 이러한 유연성은 모든 스토리지 절약이 실질적인 비용 절감으로 직결되는 클라우드 환경에서 특히 유용합니다. Milvus의 이 새로운 기능을 통해 사용자는 다양한 데이터를 모두 하나의 파일로 손쉽게 통합하여 데이터 관리를 간소화하고 전반적인 사용자 경험을 향상시킬 수 있습니다. 이 기능은 특히 가변 길이 배열 데이터 유형으로 작업하는 사용자에게 유용하며, 이제 데이터 가져오기 프로세스를 간소화할 수 있습니다.

쿼리 성능 향상

Parquet의 컬럼형 스토리지 설계와 고급 압축 방식은 쿼리 성능을 크게 향상시킵니다. 쿼리를 수행할 때 사용자는 관련 없는 데이터를 스캔할 필요 없이 관련 데이터에만 집중할 수 있습니다. 이러한 선택적 열 읽기는 CPU 사용량을 최소화하여 쿼리 시간을 단축합니다.

광범위한 언어 호환성

Parquet은 Java, C++, Python 등 여러 언어로 제공되며 수많은 데이터 처리 도구와 호환됩니다. Parquet 파일 지원으로 다른 SDK를 사용하는 Milvus 사용자도 데이터베이스 내에서 구문 분석을 위한 Parquet 파일을 원활하게 생성할 수 있습니다.

Parquet 파일을 Milvus로 가져오는 방법

데이터가 이미 Parquet 파일 형식인 경우 가져오기는 쉽습니다. Parquet 파일을 MinIO와 같은 객체 스토리지 시스템에 업로드하면 가져올 준비가 완료됩니다.

아래 코드 스니펫은 Parquet 파일을 Milvus로 가져오는 예제입니다.

remote_files = []
try:
    print("Prepare upload files")
    minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY,
                         secure=False)
    found = minio_client.bucket_exists(bucket_name)
    if not found:
        minio_client.make_bucket(bucket_name)
        print("MinIO bucket '{}' doesn't exist".format(bucket_name))
        return False, []

    # set your remote data path
    remote_data_path = "milvus_bulkinsert"

    def upload_file(f: str):
        file_name = os.path.basename(f)
        minio_file_path = os.path.join(remote_data_path, "parquet", file_name)
        minio_client.fput_object(bucket_name, minio_file_path, f)
        print("Upload file '{}' to '{}'".format(f, minio_file_path))
        remote_files.append(minio_file_path)

    upload_file(data_file)

except S3Error as e:
    print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
    return False, []

print("Successfully upload files: {}".format(remote_files))
return True, remote_files

데이터가 Parquet 파일이 아니거나 동적 필드가 있는 경우, 데이터 형식 변환 도구인 BulkWriter를 활용하여 Parquet 파일을 생성할 수 있습니다. 이제 BulkWriter는 기본 출력 데이터 형식으로 Parquet을 채택하여 개발자에게 보다 직관적인 환경을 제공합니다.

아래 코드 스니펫은 BulkWriter를 사용하여 Parquet 파일을 생성하는 예시입니다.

import numpy as np
import json

from pymilvus import (
    RemoteBulkWriter,
    BulkFileType,
)

remote_writer = RemoteBulkWriter(
        schema=your_collection_schema,
        remote_path="your_remote_data_path",
        connect_param=RemoteBulkWriter.ConnectParam(
            endpoint=YOUR_MINIO_ADDRESS,
            access_key=YOUR_MINIO_ACCESS_KEY,
            secret_key=YOUR_MINIO_SECRET_KEY,
            bucket_name="a-bucket",
        ),
        file_type=BulkFileType.PARQUET,
)

# append your data
batch_count = 10000
for i in range(batch_count):
    row = {
        "id": i,
        "bool": True if i % 5 == 0 else False,
        "int8": i % 128,
        "int16": i % 1000,
        "int32": i % 100000,
        "int64": i,
        "float": i / 3,
        "double": i / 7,
        "varchar": f"varchar_{i}",
        "json": {"dummy": i, "ok": f"name_{i}"},
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    }
    remote_writer.append_row(row)

# append rows by numpy type
for i in range(batch_count):
    remote_writer.append_row({
        "id": np.int64(i + batch_count),
        "bool": True if i % 3 == 0 else False,
        "int8": np.int8(i % 128),
        "int16": np.int16(i % 1000),
        "int32": np.int32(i % 100000),
        "int64": np.int64(i),
        "float": np.float32(i / 3),
        "double": np.float64(i / 7),
        "varchar": f"varchar_{i}",
        "json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
        "vector": gen_binary_vector() if bin_vec else gen_float_vector(),
        f"dynamic_{i}": i,
    })

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
remote_writer.commit()
print(f"Data files have been uploaded: {remote_writer.batch_files}")
remote_files = remote_writer.batch_files

그런 다음 Parquet 파일을 Milvus로 가져오기 시작할 수 있습니다.

remote_files = [remote_file_path]
task_id = utility.do_bulk_insert(collection_name=collection_name,
                                 files=remote_files)

task_ids = [task_id]         
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
for state in states:
    if state.state == BulkInsertState.ImportCompleted:
        complete_count = complete_count + 1

이제 데이터가 Milvus에 원활하게 통합되었습니다.

다음 단계는 무엇인가요?

Milvus가 계속해서 증가하는 데이터 용량을 지원함에 따라, 특히 Parquet 파일이 10GB를 초과하는 경우 상당한 크기의 가져오기를 관리하는 데 어려움이 있습니다. 이 문제를 해결하기 위해 가져오기 데이터를 스칼라 열과 벡터 열로 분리하여 가져오기당 2개의 Parquet 파일을 생성하여 I/O 부담을 완화할 계획입니다. 수백 기가바이트를 초과하는 데이터 세트의 경우, 데이터를 여러 번 가져오는 것이 좋습니다.

Like the article? Spread the word

계속 읽기