소스 데이터 준비
이 페이지에서는 컬렉션에 데이터를 일괄 삽입하기 전에 고려해야 할 사항에 대해 설명합니다.
시작하기 전에
대상 컬렉션을 사용하려면 소스 데이터를 해당 스키마에 매핑해야 합니다. 아래 다이어그램은 허용되는 소스 데이터를 대상 컬렉션의 스키마에 매핑하는 방법을 보여줍니다.
데이터를 스키마에 매핑
데이터를 면밀히 검토하고 그에 따라 대상 컬렉션의 스키마를 설계해야 합니다.
위 다이어그램의 JSON 데이터를 예로 들어보면 행 목록에 두 개의 엔티티가 있고 각 행에는 6개의 필드가 있습니다. 컬렉션 스키마는 id, 벡터, scalar_1, scalar_2의 네 가지를 선택적으로 포함합니다.
스키마를 설계할 때 고려해야 할 두 가지 사항이 더 있습니다:
자동 ID 활성화 여부
id 필드는 컬렉션의 기본 필드 역할을 합니다. 기본 필드가 자동으로 증가하도록 하려면 스키마에서 AutoID를 활성화하면 됩니다. 이 경우 소스 데이터의 각 행에서 id 필드를 제외해야 합니다.
동적 필드 활성화 여부
스키마에서 동적 필드를 활성화하는 경우 대상 컬렉션은 사전 정의된 스키마에 포함되지 않은 필드도 저장할 수 있습니다. 메타 필드는 동적 필드와 해당 값을 키-값 쌍으로 보유하기 위해 예약된 JSON 필드입니다. 위 다이어그램에서 dynamic_field_1 및 dynamic_field_2 필드와 값은 $meta 필드에 키-값 쌍으로 저장됩니다.
다음 코드는 위 다이어그램에 설명된 컬렉션의 스키마를 설정하는 방법을 보여줍니다.
자세한 정보를 얻으려면 create_schema()
및 add_field()
를 참조하세요.
자세한 정보를 얻으려면 SDK 참조에서 CollectionSchema
를 참조하세요.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
BulkWriter 설정
BulkWriter는 원시 데이터 세트를 RESTful 가져오기 API를 통해 가져오기에 적합한 형식으로 변환하도록 설계된 도구입니다. 두 가지 유형의 작성기를 제공합니다:
- 로컬 벌크 라이터: 지정된 데이터 집합을 읽고 사용하기 쉬운 형식으로 변환합니다.
- RemoteBulkWriter: LocalBulkWriter와 동일한 작업을 수행하지만 변환된 데이터 파일을 지정된 원격 개체 스토리지 버킷으로 추가로 전송합니다.
RemoteBulkWriter는 변환된 데이터 파일을 대상 오브젝트 스토리지 버킷으로 전송한다는 점에서 LocalBulkWriter와 다릅니다.
LocalBulkWriter 설정
LocalBulkWriter는 소스 데이터 집합의 행을 추가하고 지정된 형식의 로컬 파일에 커밋합니다.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
로컬 대량 작성기를 만들 때는 다음과 같이 해야 합니다:
schema
에서 생성된 스키마를 참조합니다.local_path
을 출력 디렉터리로 설정합니다.file_type
을 출력 파일 유형으로 설정합니다.- 데이터 세트에 많은 수의 레코드가 포함되어 있는 경우
segment_size
을 적절한 값으로 설정하여 데이터를 세분화하는 것이 좋습니다.
매개변수 설정에 대한 자세한 내용은 SDK 참조에서 LocalBulkWriter를 참조하세요.
로컬벌크라이터를 생성할 때는 다음과 같이 해야 합니다:
CollectionSchema()
에서 생성된 스키마를 참조합니다.withLocalPath()
에서 출력 디렉터리를 설정합니다.withFileType()
에서 출력 파일 유형을 설정합니다.- 데이터 세트에 많은 수의 레코드가 포함되어 있는 경우
withChunkSize()
을 적절한 값으로 설정하여 데이터를 세분화하는 것이 좋습니다.
매개변수 설정에 대한 자세한 내용은 SDK 참조에서 LocalBulkWriter를 참조하세요.
RemoteBulkWriter 설정
RemoteBulkWriter는 추가된 데이터를 로컬 파일에 커밋하는 대신 원격 버킷에 커밋합니다. 따라서 RemoteBulkWriter를 만들기 전에 ConnectParam 객체를 설정해야 합니다.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
연결 매개변수가 준비되면 다음과 같이 RemoteBulkWriter에서 이를 참조할 수 있습니다:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
RemoteBulkWriter를 만들기 위한 매개 변수는 connect_param
을 제외하고는 LocalBulkWriter의 매개 변수와 거의 동일합니다. 매개변수 설정에 대한 자세한 내용은 SDK 참조에서 RemoteBulkWriter 및 ConnectParam을 참조하세요.
RemoteBulkWriter를 만들기 위한 매개변수는 StorageConnectParam
을 제외하고는 LocalBulkWriter의 매개변수와 거의 동일합니다. 매개변수 설정에 대한 자세한 내용은 SDK 참조에서 RemoteBulkWriter 및 StorageConnectParam을 참조하세요.
쓰기 시작
append_row()
는 소스 데이터 세트에서 행을 추가하고 commit()
는 추가된 행을 로컬 파일 또는 원격 버킷에 커밋합니다.
appendRow()
는 소스 데이터 집합에서 행을 추가하고 commit()
는 추가된 행을 로컬 파일 또는 원격 버킷에 커밋합니다.
데모를 위해 다음 코드는 무작위로 생성된 데이터를 추가합니다.
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
정의된 스키마는 동적 필드를 허용하므로, 다음과 같이 데이터에 스키마에 정의되지 않은 필드를 포함하여 삽입할 수도 있습니다.
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
결과 확인
결과를 확인하려면 작성자의 batch_files
속성을 인쇄하여 실제 출력 경로를 얻을 수 있습니다.
결과를 확인하려면 작성기의 getBatchFiles()
메서드를 출력하여 실제 출력 경로를 얻을 수 있습니다.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
벌크라이터는 UUID를 생성하고, 제공된 출력 디렉토리에 UUID를 사용하여 하위 폴더를 생성한 후 생성된 모든 파일을 하위 폴더에 배치합니다. 준비된 샘플 데이터를 다운로드하려면 여기를 클릭하세요.
가능한 폴더 구조는 다음과 같습니다:
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet