RemoteBulkWriter
A RemoteBulkWriter instance writes your raw data in a format that Milvus understands into an AWS-S3-compatible bucket.
RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam)
Methods of RemoteBulkWriter
:
Method |
Description |
Parameters |
---|---|---|
appendRow(JsonObject rowData) |
Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file. |
rowData: A gson.JsonObject to store the data of a row. |
commit(boolean async) |
Force persist data files and complete the writer. |
async: Set to true to wait until all data files are persisted. |
getBatchFiles() |
Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface. |
N/A |
RemoteBulkWriterParam
Use the RemoteBulkWriterParam.Builder
to construct a RemoteBulkWriterParam
object.
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam.Builder builder = RemoteBulkWriterParam.newBuilder();
Methods of RemoteBulkWriterParam.Builder
:
Method |
Description |
Parameters |
---|---|---|
withCollectionSchema(CollectionSchemaParam collectionSchema) |
Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section. |
collectionSchema: collection schema |
withConnectParam(StorageConnectParam connectParam) |
Sets the connect parameters for different storage remote services. Currently, two options are available: S3ConnectParam and AzureConnectParam. |
connectParam: Connect parameters for remote storage service. |
withRemotePath(String remotePath) |
Sets the path on the remote storage service where to upload the data files. |
remotePath: A path on the remote storage service. |
withChunkSize(int chunkSize) |
Sets the maximum size of a data chunk. |
chunkSize: the maximum size of a data chunk. |
withFileType(BulkFileType fileType) |
The type of the output file. Currently, only PARQUET is available. |
fileType: The output file type. |
build() |
Constructs a LocalBulkWriterParam object |
N/A |
AzureConnectParam
Use the AzureConnectParam.Builder
to construct an AzureConnectParam
object.
import io.milvus.bulkwriter.connect.AzureConnectParam;
AzureConnectParam.Builder builder = AzureConnectParam.newBuilder();
Methods of AzureConnectParam.Builder
:
Method |
Description |
Parameters |
---|---|---|
withContainerName(String containerName) |
Sets the Azure container name. |
containerName: The target container name. |
withConnStr(String connStr) |
Sets the connect string. |
connStr: A connection string to an Azure Storage account, which can be parsed to an account_url and a credential.To generate a connection string, read this link: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string |
withAccountUrl(String accountUrl) |
Sets the account url. |
accountUrl: A string in format like https://<storage-account>.blob.core.windows.netRead this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview |
withCredential(TokenCredential credential) |
Set the credential. |
credential: Account access key for the account, read this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys |
build() |
Constructs an AzureConnectParam object |
N/A |
S3ConnectParam
Use the S3ConnectParam.Builder
to construct a S3ConnectParam
object.
import io.milvus.bulkwriter.connect.S3ConnectParam;
S3ConnectParam.Builder builder = S3ConnectParam.newBuilder();
Methods of S3ConnectParam.Builder
:
Method |
Description |
Parameters |
---|---|---|
withCloudName(String cloudName) |
Sets the cloud name of the S3. |
cloudName: The cloud name. |
withBucketName(String bucketName) |
Sets the bucket name. |
bucketName: The bucket name. |
withEndpoint(String endpoint) |
Sets the endpoint. |
endpoint: The endpoint. |
withAccessKey(String accessKey) |
Set the access key. |
accessKey: The access key. |
withSecretKey(String secretKey) |
Set the secret key. |
secretKey: The secret key. |
withSessionToken(String sessionToken) |
Set the session token. |
sessionToken: The session token. |
withRegion(String region) |
Set the region name. |
region: The region name. |
withHttpClient(OkHttpClient httpClient) |
Set the http client in necessary. |
httpClient: http client. |
build() |
Constructs a S3ConnectParam object |
N/A |
Example
import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
.addFieldType(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build())
.addFieldType(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(DIM)
.build())
.build();
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(STORAGE_ENDPOINT)
.withCloudName(CLOUD_NAME)
.withBucketName(STORAGE_BUCKET)
.withAccessKey(STORAGE_ACCESS_KEY)
.withSecretKey(STORAGE_SECRET_KEY)
.withRegion(STORAGE_REGION)
.build();
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("bulk_data")
.withFileType(BulkFileType.PARQUET)
.withChunkSize(512 * 1024 * 1024)
.withConnectParam(connectParam)
.build();
try (RemoteBulkWriter remoteBulkWriter = RemoteBulkWriter(bulkWriterParam)) {
Gson gson = new Gson();
for (int i = 0; i < 10000; ++i) {
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
remoteBulkWriter.appendRow(row);
}
System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
System.out.println("Generate data files...");
remoteBulkWriter.commit(false);
List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
System.out.printf("Data files have been uploaded: %s%n", batchFiles);
for (List<String> files : batchFiles) {
R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFiles(files)
.build());
}
} catch (Exception e) {
System.out.println("allTypesRemoteWriter catch exception: " + e);
throw e;
}