RemoteBulkWriter
A RemoteBulkWriter instance writes your raw data in a format that Milvus understands into an AWS-S3-compatible bucket.
RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam)
Methods of RemoteBulkWriter:
| Method | Description | Parameters | 
|---|---|---|
| appendRow(JsonObject rowData) | Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file. | rowData: A gson.JsonObject to store the data of a row. | 
| commit(boolean async) | Force persist data files and complete the writer. | async: Set to true to wait until all data files are persisted. | 
| getBatchFiles() | Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface. | N/A | 
RemoteBulkWriterParam
Use the RemoteBulkWriterParam.Builder to construct a RemoteBulkWriterParam object.
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam.Builder builder = RemoteBulkWriterParam.newBuilder();
Methods of RemoteBulkWriterParam.Builder:
| Method | Description | Parameters | 
|---|---|---|
| withCollectionSchema(CollectionSchemaParam collectionSchema) | Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section. | collectionSchema: collection schema | 
| withConnectParam(StorageConnectParam connectParam) | Sets the connect parameters for different storage remote services. Currently, two options are available: S3ConnectParam and AzureConnectParam. | connectParam: Connect parameters for remote storage service. | 
| withRemotePath(String remotePath) | Sets the path on the remote storage service where to upload the data files. | remotePath: A path on the remote storage service. | 
| withChunkSize(int chunkSize) | Sets the maximum size of a data chunk. | chunkSize: the maximum size of a data chunk. | 
| withFileType(BulkFileType fileType) | The type of the output file. Currently, only PARQUET is available. | fileType: The output file type. | 
| build() | Constructs a LocalBulkWriterParam object | N/A | 
AzureConnectParam
Use the AzureConnectParam.Builder to construct an AzureConnectParam object.
import io.milvus.bulkwriter.connect.AzureConnectParam;
AzureConnectParam.Builder builder = AzureConnectParam.newBuilder();
Methods of AzureConnectParam.Builder:
| Method | Description | Parameters | 
|---|---|---|
| withContainerName(String containerName) | Sets the Azure container name. | containerName: The target container name. | 
| withConnStr(String connStr) | Sets the connect string. | connStr: A connection string to an Azure Storage account, which can be parsed to an account_url and a credential.To generate a connection string, read this link: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string | 
| withAccountUrl(String accountUrl) | Sets the account url. | accountUrl: A string in format like https://<storage-account>.blob.core.windows.netRead this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview | 
| withCredential(TokenCredential credential) | Set the credential. | credential: Account access key for the account, read this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys | 
| build() | Constructs an AzureConnectParam object | N/A | 
S3ConnectParam
Use the S3ConnectParam.Builder to construct a S3ConnectParam object.
import io.milvus.bulkwriter.connect.S3ConnectParam;
S3ConnectParam.Builder builder = S3ConnectParam.newBuilder();
Methods of S3ConnectParam.Builder:
| Method | Description | Parameters | 
|---|---|---|
| withCloudName(String cloudName) | Sets the cloud name of the S3. | cloudName: The cloud name. | 
| withBucketName(String bucketName) | Sets the bucket name. | bucketName: The bucket name. | 
| withEndpoint(String endpoint) | Sets the endpoint. | endpoint: The endpoint. | 
| withAccessKey(String accessKey) | Set the access key. | accessKey: The access key. | 
| withSecretKey(String secretKey) | Set the secret key. | secretKey: The secret key. | 
| withSessionToken(String sessionToken) | Set the session token. | sessionToken: The session token. | 
| withRegion(String region) | Set the region name. | region: The region name. | 
| withHttpClient(OkHttpClient httpClient) | Set the http client in necessary. | httpClient: http client. | 
| build() | Constructs a S3ConnectParam object | N/A | 
Example
import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
        .addFieldType(FieldType.newBuilder()
                .withName("id")
                .withDataType(DataType.Int64)
                .withPrimaryKey(true)
                .withAutoID(false)
                .build())
        .addFieldType(FieldType.newBuilder()
                .withName("vector")
                .withDataType(DataType.FloatVector)
                .withDimension(DIM)
                .build())
        .build();
StorageConnectParam connectParam = S3ConnectParam.newBuilder()
        .withEndpoint(STORAGE_ENDPOINT)
        .withCloudName(CLOUD_NAME)
        .withBucketName(STORAGE_BUCKET)
        .withAccessKey(STORAGE_ACCESS_KEY)
        .withSecretKey(STORAGE_SECRET_KEY)
        .withRegion(STORAGE_REGION)
        .build();
        
RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
        .withCollectionSchema(collectionSchema)
        .withRemotePath("bulk_data")
        .withFileType(BulkFileType.PARQUET)
        .withChunkSize(512 * 1024 * 1024)
        .withConnectParam(connectParam)
        .build();
        
try (RemoteBulkWriter remoteBulkWriter = RemoteBulkWriter(bulkWriterParam)) {
    Gson gson = new Gson();
    for (int i = 0; i < 10000; ++i) {
        JsonObject row = new JsonObject();
        row.addProperty("id", i);
        row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
        remoteBulkWriter.appendRow(row);
    }
    System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
    System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
    System.out.println("Generate data files...");
    remoteBulkWriter.commit(false);
    List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
    System.out.printf("Data files have been uploaded: %s%n", batchFiles);
    
    for (List<String> files : batchFiles) {
        R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
            .withCollectionName(COLLECTION_NAME)
            .withFiles(files)
            .build());
    }
} catch (Exception e) {
    System.out.println("allTypesRemoteWriter catch exception: " + e);
    throw e;
}