Menyiapkan Data Sumber
Halaman ini membahas sesuatu yang harus Anda pertimbangkan sebelum mulai memasukkan data secara massal ke dalam koleksi Anda.
Sebelum Anda mulai
Koleksi target memerlukan pemetaan data sumber ke skema. Diagram di bawah ini menunjukkan bagaimana data sumber yang dapat diterima dipetakan ke skema koleksi target.
Memetakan data ke skema
Anda harus memeriksa data Anda dengan cermat dan merancang skema koleksi target yang sesuai.
Mengambil data JSON pada diagram di atas sebagai contoh, ada dua entitas dalam daftar baris, setiap baris memiliki enam bidang. Skema koleksi secara selektif mencakup empat: id, vektor, skalar_1, dan skalar_2.
Ada dua hal lagi yang perlu dipertimbangkan ketika mendesain skema:
Apakah akan mengaktifkan AutoID
Bidang id berfungsi sebagai bidang utama koleksi. Untuk membuat field utama bertambah secara otomatis, Anda dapat mengaktifkan AutoID dalam skema. Dalam hal ini, Anda harus mengecualikan field id dari setiap baris dalam data sumber.
Apakah akan mengaktifkan bidang dinamis
Koleksi target juga dapat menyimpan bidang yang tidak termasuk dalam skema yang telah ditentukan sebelumnya jika skema tersebut mengaktifkan bidang dinamis. Bidang $meta adalah bidang JSON yang dicadangkan untuk menyimpan bidang dinamis dan nilainya dalam pasangan kunci-nilai. Pada diagram di atas, bidang dynamic_field_1 dan dynamic_field_2 serta nilainya akan disimpan sebagai pasangan kunci-nilai dalam bidang $meta.
Kode berikut ini menunjukkan cara mengatur skema untuk koleksi yang diilustrasikan dalam diagram di atas.
Untuk mendapatkan informasi lebih lanjut, lihat create_schema()
dan add_field()
dalam referensi SDK.
Untuk mendapatkan informasi lebih lanjut, lihat CollectionSchema
dalam referensi SDK.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
DIM = 512
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
schema.verify()
print(schema)
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.BulkImport;
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.common.clientenum.CloudStorage;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.request.describe.MilvusDescribeImportRequest;
import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
import io.milvus.common.utils.Float16Utils;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
private static final String MINIO_ENDPOINT = CloudStorage.MINIO.getEndpoint("http://127.0.0.1:9000");
private static final String BUCKET_NAME = "a-bucket";
private static final String ACCESS_KEY = "minioadmin";
private static final String SECRET_KEY = "minioadmin";
private static final Integer DIM = 512;
private static final Gson GSON_INSTANCE = new Gson();
private static CreateCollectionReq.CollectionSchema createSchema() {
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.enableDynamicField(true)
.build();
schema.addField(AddFieldReq.builder()
.fieldName("id")
.dataType(io.milvus.v2.common.DataType.Int64)
.isPrimaryKey(Boolean.TRUE)
.autoID(false)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("bool")
.dataType(DataType.Bool)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int8")
.dataType(DataType.Int8)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int16")
.dataType(DataType.Int16)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int32")
.dataType(DataType.Int32)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("int64")
.dataType(DataType.Int64)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float")
.dataType(DataType.Float)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("double")
.dataType(DataType.Double)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("varchar")
.dataType(DataType.VarChar)
.maxLength(512)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("json")
.dataType(io.milvus.v2.common.DataType.JSON)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("array_int")
.dataType(io.milvus.v2.common.DataType.Array)
.maxCapacity(100)
.elementType(io.milvus.v2.common.DataType.Int64)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("array_str")
.dataType(io.milvus.v2.common.DataType.Array)
.maxCapacity(100)
.elementType(io.milvus.v2.common.DataType.VarChar)
.maxLength(128)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float_vector")
.dataType(io.milvus.v2.common.DataType.FloatVector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("binary_vector")
.dataType(io.milvus.v2.common.DataType.BinaryVector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("float16_vector")
.dataType(io.milvus.v2.common.DataType.Float16Vector)
.dimension(DIM)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("sparse_vector")
.dataType(io.milvus.v2.common.DataType.SparseFloatVector)
.build());
return schema;
}
Menyiapkan BulkWriter
BulkWriter adalah alat yang dirancang untuk mengonversi set data mentah menjadi format yang cocok untuk mengimpor melalui RESTful Import API. Alat ini menawarkan dua jenis penulis:
- LocalBulkWriter: Membaca kumpulan data yang ditentukan dan mengubahnya menjadi format yang mudah digunakan.
- RemoteBulkWriter: Melakukan tugas yang sama dengan LocalBulkWriter tetapi juga mentransfer file data yang dikonversi ke ember penyimpanan objek jarak jauh yang ditentukan.
RemoteBulkWriter berbeda dengan LocalBulkWriter karena RemoteBulkWriter mentransfer file data yang dikonversi ke ember penyimpanan objek target.
Menyiapkan LocalBulkWriter
LocalBulkWriter menambahkan baris dari kumpulan data sumber dan mengomitnya ke file lokal dengan format yang ditentukan.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Ketika membuat LocalBulkWriter, Anda harus:
- Merujuk skema yang dibuat di
schema
. - Tetapkan
local_path
ke direktori keluaran. - Atur
file_type
ke jenis file keluaran. - Jika dataset Anda berisi sejumlah besar catatan, Anda disarankan untuk menyegmentasikan data dengan mengatur
segment_size
ke nilai yang tepat.
Untuk detail mengenai pengaturan parameter, lihat LocalBulkWriter dalam referensi SDK.
Ketika membuat LocalBulkWriter, Anda harus:
- Merujuk skema yang dibuat di
CollectionSchema()
. - Mengatur direktori keluaran di
withLocalPath()
. - Mengatur jenis file keluaran di
withFileType()
. - Jika dataset Anda berisi sejumlah besar catatan, Anda disarankan untuk menyegmentasikan data dengan mengatur
withChunkSize()
ke nilai yang tepat.
Untuk detail tentang pengaturan parameter, lihat LocalBulkWriter di referensi SDK.
Menyiapkan RemoteBulkWriter
Alih-alih menambahkan data yang ditambahkan ke berkas lokal, RemoteBulkWriter menambahkannya ke bucket jarak jauh. Oleh karena itu, Anda harus menyiapkan objek ConnectParam sebelum membuat RemoteBulkWriter.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="a-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
print('bulk writer created.')
private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException {
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_ENDPOINT)
.withBucketName(BUCKET_NAME)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("/")
.withConnectParam(connectParam)
.withFileType(BulkFileType.PARQUET)
.build();
return new RemoteBulkWriter(bulkWriterParam);
}
Setelah parameter koneksi siap, Anda bisa mereferensikannya di RemoteBulkWriter sebagai berikut:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
Parameter untuk membuat RemoteBulkWriter hampir sama dengan parameter untuk LocalBulkWriter, kecuali connect_param
. Untuk detail tentang pengaturan parameter, lihat RemoteBulkWriter dan ConnectParam dalam referensi SDK.
Parameter untuk membuat RemoteBulkWriter hampir sama dengan parameter untuk LocalBulkWriter, kecuali StorageConnectParam
. Untuk detail pengaturan parameter, lihat RemoteBulkWriter dan StorageConnectParam dalam referensi SDK.
Mulai menulis
BulkWriter memiliki dua metode: append_row()
menambahkan baris dari kumpulan data sumber, dan commit()
mengomit baris yang ditambahkan ke file lokal atau bucket jarak jauh.
BulkWriter memiliki dua metode: appendRow()
menambahkan baris dari kumpulan data sumber, dan commit()
mengomit baris yang ditambahkan ke berkas lokal atau remote bucket.
Untuk tujuan demonstrasi, kode berikut ini menambahkan data yang dibuat secara acak.
import random, string, json
import numpy as np
import tensorflow as tf
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector
# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector
# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector
# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector
# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector
for i in range(10000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')
print(writer.batch_files)
private static byte[] genBinaryVector() {
Random ran = new Random();
int byteCount = DIM / 8;
ByteBuffer vector = ByteBuffer.allocate(byteCount);
for (int i = 0; i < byteCount; ++i) {
vector.put((byte) ran.nextInt(Byte.MAX_VALUE));
}
return vector.array();
}
private static List<Float> genFloatVector() {
Random ran = new Random();
List<Float> vector = new ArrayList<>();
for (int i = 0; i < DIM; ++i) {
vector.add(ran.nextFloat());
}
return vector;
}
private static byte[] genFloat16Vector() {
List<Float> originalVector = genFloatVector();
return Float16Utils.f32VectorToFp16Buffer(originalVector).array();
}
private static SortedMap<Long, Float> genSparseVector() {
Random ran = new Random();
SortedMap<Long, Float> sparse = new TreeMap<>();
int dim = ran.nextInt(18) + 2; // [2, 20)
for (int i = 0; i < dim; ++i) {
sparse.put((long)ran.nextInt(1000000), ran.nextFloat());
}
return sparse;
}
private static List<String> genStringArray(int length) {
List<String> arr = new ArrayList<>();
for (int i = 0; i < length; i++) {
arr.add("str_" + i);
}
return arr;
}
private static List<Long> genIntArray(int length) {
List<Long> arr = new ArrayList<>();
for (long i = 0; i < length; i++) {
arr.add(i);
}
return arr;
}
private static RemoteBulkWriter createRemoteBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema) throws IOException {
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_ENDPOINT)
.withBucketName(BUCKET_NAME)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("/")
.withConnectParam(connectParam)
.withFileType(BulkFileType.PARQUET)
.build();
return new RemoteBulkWriter(bulkWriterParam);
}
private static List<List<String>> uploadData() throws Exception {
CreateCollectionReq.CollectionSchema collectionSchema = createSchema();
try (RemoteBulkWriter remoteBulkWriter = createRemoteBulkWriter(collectionSchema)) {
for (int i = 0; i < 10000; ++i) {
JsonObject rowObject = new JsonObject();
rowObject.addProperty("id", i);
rowObject.addProperty("bool", i % 3 == 0);
rowObject.addProperty("int8", i % 128);
rowObject.addProperty("int16", i % 1000);
rowObject.addProperty("int32", i % 100000);
rowObject.addProperty("int64", i);
rowObject.addProperty("float", i / 3);
rowObject.addProperty("double", i / 7);
rowObject.addProperty("varchar", "varchar_" + i);
rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
rowObject.add("array_str", GSON_INSTANCE.toJsonTree(genStringArray(5)));
rowObject.add("array_int", GSON_INSTANCE.toJsonTree(genIntArray(10)));
rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(genFloatVector()));
rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(genBinaryVector()));
rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(genFloat16Vector()));
rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(genSparseVector()));
rowObject.addProperty("dynamic", "dynamic_" + i);
remoteBulkWriter.appendRow(rowObject);
if ((i+1)%1000 == 0) {
remoteBulkWriter.commit(false);
}
}
List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
System.out.println(batchFiles);
return batchFiles;
} catch (Exception e) {
throw e;
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
}
Memverifikasi hasil
Untuk memeriksa hasilnya, Anda dapat memperoleh jalur keluaran yang sebenarnya dengan mencetak properti batch_files
milik penulis.
Untuk memeriksa hasilnya, Anda bisa mendapatkan jalur keluaran yang sebenarnya dengan mencetak metode getBatchFiles()
dari penulis.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter menghasilkan UUID, membuat sub-folder menggunakan UUID di direktori output yang disediakan, dan menempatkan semua file yang dihasilkan di sub-folder. Klik di sini untuk mengunduh data sampel yang telah disiapkan.
Struktur folder yang mungkin adalah sebagai berikut:
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet