milvus-logo

Insert Entities from Files

Milvus 2.2 now supports inserting a batch of entities from a file. Compared to the insert() method, this feature reduces network transmission across the Milvus client, proxy, Pulsar, and data nodes. You can now import a batch of entities in one file or multiple files into a collection with just a few lines of code.

Prepare the data file

Organize the data to be inserted into a Milvus collection in a row-based JSON file or multiple NumPy files.

Row-based JSON file

You can name the file whatever makes sense, but the root key must be rows. In the file, each entity is organized in a dictionary. The keys in the dictionary are field names, and the values are field values in the corresponding entity.

The following is an example of a row-based JSON file. You can include fields not defined in the collection schema as dynamic fields. For details, refer to Dynamic Schema.

{
  "rows":[
    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
  ]
}

# To include dynamic fields, do as follows:

{
  "rows":[
    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2], "book_props": {"year": 2015, "price": 23.43}},
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2], "book_props": {"year": 2018, "price": 15.05}},
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2], "book_props": {"year": 2020, "price": 36.68}},
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2] , "book_props": {"year": 2019, "price": 20.14}},
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2] , "book_props": {"year": 2021, "price": 9.36}}
  ]
}
  • Do not add any field that does not exist in the target collection, and do not miss any field that the schema of the target collection defines.
  • To add fields that are not predefined in the schema, you should enable dynamic schema for the collection. In this case, Milvus automatically adds these fields to an internal JSON field. For details, refer to Dynamic Schema.
  • Use the correct types of values in each field. For example, use integers in integer fields, floats in float fields, strings in varchar fields, and float arrays in vector fields.
  • Do not include an auto-generated primary key in the JSON file.
  • For binary vectors, use uint8 arrays. Each uint8 value represents 8 dimensions, and the value must be between 0 and 255. For example, [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1] is a 16-dimensional binary vector and should be written as [128, 7] in the JSON file.
  • If you have enabled dynamic schema for a collection, you can add fields that are not pre-defined in the schema. Milvus automatically adds these non-existent fields into a JSON field.

Column-based NumPy files

As an alternative to the row-based JSON file mentioned above, you can also use NumPy arrays to organize each column of a dataset in a separate file. In this case, use the field name of each column to name the NumPy file.

import numpy
numpy.save('book_id.npy', numpy.array([101, 102, 103, 104, 105]))
numpy.save('word_count.npy', numpy.array([13, 25, 7, 12, 34]))
arr = numpy.array([[1.1, 1.2],
            [2.1, 2.2],
            [3.1, 3.2],
            [4.1, 4.2],
            [5.1, 5.2]])
numpy.save('book_intro.npy', arr)
arr = numpy.array([json.dumps({"year": 2015, "price": 23.43}),
            json.dumps({"year": 2018, "price": 15.05}),
            json.dumps({"year": 2020, "price": 36.68}),
            json.dumps({"year": 2019, "price": 20.14}),
            json.dumps({"year": 2021, "price": 9.36})])
numpy.save('book_props.npy', arr)

You can also add dynamic fields using NumPy files as follows. For details on dynamic fields, refer to Dynamic Schema.

numpy.save('$meta.py', numpy.array([ json.dumps({x: 2}), json.dumps({y: 8, z: 2}) ]))
  • Use the field name of each column to name the NumPy file. Do not add files named after a field that does not exist in the target collection. There should be one NumPy file for each field.
  • Use the correct value type when creating NumPy arrays. For details, refer to these examples.

Insert entities from files

1. Upload data files

You can use either MinIO or the local hard disk for storage in Milvus.

Using the local hard disk for storage is only available in Milvus Standalone.
  • To use MinIO for storage, upload data files to the bucket defined by minio.bucketName in the milvus.yml configuration file.
  • For local storage, copy the data files into a directory of the local disk.

2. Insert entities

To facilitate data import from files, Milvus offers a bulk-insert API in various flavors. In PyMilvus, you can use the do_bulk_insert() method. As to the Java SDK, use the bulkInsert method.

In this method, you need to set the name of the target collection as collection_name and the list of files prepared in the previous step as files. Optionally, you can specify the name of a specific partition as partition_name in the target collection so that Milvus imports the data from the files listed only into this partition.

  • For a row-based JSON file, parameter files should be a one-member list containing the path to the JSON file.

    from pymilvus import utility
    task_id = utility.do_bulk_insert(
        collection_name="book",
        partition_name="2022",
        files=["test.json"]
    )
    
    import io.milvus.param.bulkinsert.BulkInsertParam;
    import io.milvus.response.BulkInsertResponseWrapper;
    import io.milvus.grpc.ImportResponse;
    import io.milvus.param.R;
    
    BulkInsertParam param = BulkInsertParam.newBuilder()
            .withCollectionName("book")
            .withPartitionName("2022")
            .addFile("test.json")
            .build()
    R<ImportResponse> response = milvusClient.bulkInsert(param);
    BulkInsertResponseWrapper wrapper = new BulkInsertResponseWrapper(response.getData());
    task_id = wrapper.getTaskID();
    
  • For a set of column-based NumPy files, parameter files should be a multi-member list containing the paths to the NumPy files.

    from pymilvus import utility
    task_id = utility.do_bulk_insert(
        collection_name="book",
        partition_name="2022",
        files=["book_id.npy", "word_count.npy", "book_intro.npy", "book_props.npy"]
    )
    
    import io.milvus.param.bulkinsert.BulkInsertParam;
    import io.milvus.response.BulkInsertResponseWrapper;
    import io.milvus.grpc.ImportResponse;
    import io.milvus.param.R;
    
    BulkInsertParam param = BulkInsertParam.newBuilder()
            .withCollectionName("book")
            .withPartitionName("2022")
            .addFile("book_id.npy")
            .addFile("word_count.npy")
            .addFile("book_intro.npy")
            .addFile("book_props.npy")
            .build()
    R<ImportResponse> response = milvusClient.bulkInsert(param);
    BulkInsertResponseWrapper wrapper = new BulkInsertResponseWrapper(response.getData());
    task_id = wrapper.getTaskID();
    

    Each call to the bulk-insert API returns immediately. The return value is the ID of a data-import task running in the background. Milvus maintains a queue of such tasks to be dispatched in parallel to idle data nodes.

    When setting the file paths, note that

    • If you upload the data file to a MinIO instance, a valid file path should be relative to the root bucket defined in "milvus.yml", such as "data/book_id.npy".
    • If you upload the data file to the local hard drive, a valid file path should be an absolute path such as "/tmp/data/book_id.npy".

    If you have a lot of files to process, consider creating multiple data-import tasks and have them run in parallel.

After inserting entities into a collection that has previously been indexed, you do not need to re-index the collection, as Milvus will automatically create an index for the newly inserted data. For more information, refer to Can indexes be created after inserting vectors?

List tasks

Check task state

Since the bulk-insert API is asynchronous, you might need to check whether a data-import task is complete. Milvus provides a BulkInsertState object to hold the details of a data-import task and you can use the get-bulk-insert-state API to retrieve this object using the programming language of your choice.

In the flavor of PyMilvus, you can use get_bulk_insert_state(). For Java SDK, use getBulkInsertState().

from pymilvus import utility, BulkInsertState
task = utility.get_bulk_insert_state(task_id=task_id)
print("Task state:", task.state_name)
print("Imported files:", task.files)
print("Collection name:", task.collection_name)
print("Partition name:", task.partition_name)
print("Start time:", task.create_time_str)
print("Imported row count:", task.row_count)
print("Entities ID array generated by this task:", task.ids)

if task.state == BulkInsertState.ImportFailed:
    print("Failed reason:", task.failed_reason)
import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
import io.milvus.response.GetBulkInsertStateWrapper;
import io.milvus.grpc.GetImportStateResponse;
import io.milvus.grpc.ImportState;
import io.milvus.param.R;

GetBulkInsertStateParam param = GetBulkInsertStateParam.newBuilder()
        .withTask(task_id)
        .build()
R<GetImportStateResponse> response = milvusClient.getBulkInsertState(param);
GetBulkInsertStateWrapper wrapper = new GetBulkInsertStateWrapper(response.getData());
ImportState state = wrapper.getState();
long row_count = wrapper.getImportedCount();
String create_ts = wrapper.getCreateTimeStr();
String failed_reason = wrapper.getFailedReason();
String files = wrapper.getFiles();
int progress = wrapper.getProgress();

The following table lists the state of a data-import task returned.

State Code Description
Pending 0 The task is pending.
Failed 1 The task fails. Use task.failed_reason to understand why the task fails.
Started 2 The task has been dispatched to a data node and will be executed soon.
Persisted 5 New data segments have been generated and persisted.
Completed 6 The metadata has been updated for the new segments.
Failed and cleaned 7 The task fails and all temporary data generated by this task are cleared.

List all tasks

Milvus also offers a list-bulk-insert-tasks API for you to list all data-import tasks. In this method, you need to specify a collection name so that Milvus lists all tasks that import data into this collection. Optionally, you can specify a limit for the maximum number of tasks to return.

tasks = utility.list_bulk_insert_tasks(collection_name="book", limit=10)
for task in tasks:
    print(task)
import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
import io.milvus.grpc. ListImportTasksResponse;
import io.milvus.grpc.GetImportStateResponse;
import io.milvus.grpc.ImportState;
import io.milvus.param.R;

ListBulkInsertTasksParam param = ListBulkInsertTasksParam.newBuilder()
    .withCollectionName("book")
    .build()
R<ListImportTasksResponse> response = milvusClient.listBulkInsertTasks(param);
List<GetImportStateResponse> tasks = response.getTasksList();
for (GetImportStateResponse task : tasks) {
    GetBulkInsertStateWrapper wrapper = new GetBulkInsertStateWrapper(task);
    ImportState state = wrapper.getState();
    long row_count = wrapper.getImportedCount();
    String create_ts = wrapper.getCreateTimeStr();
    String failed_reason = wrapper.getFailedReason();
    String files = wrapper.getFiles();
}
Parameter Description
collection_name (optional) Specify the target collection name to list all tasks on this collection. Leave the value empty if you want to list all tasks recorded by Milvus root coords.
limit (optional) Specify this parameter to limit the number of returned tasks.

See System Configurations for more information about import task configurations.

Limits

Feature Maximum limit
Max. size of task pending list 65536
Max. size of a data file 16 GB

Reference

Configure Milvus for data import

To have Milvus remove failed or old data-import tasks automatically, you can specify a timeout duration and retention period for data-import tasks in the Milvus configuration file.

rootCoord:
  # (in seconds) Duration after which an import task will expire (be killed). Default 900 seconds (15 minutes).
  # Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
  importTaskExpiration: 900
  # (in seconds) Milvus will keep the record of import tasks for at least `importTaskRetention` seconds. Default 86400
  # seconds (24 hours).
  # Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
  importTaskRetention: 86400

Create NumPy files

The following examples demonstrate how to create NumPy files for columns of data types that Milvus supports.

  • Create a Numpy file from a boolean array

    import numpy as np
    data = [True, False, True, False]
    dt = np.dtype('bool', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from an int8 array

    import numpy as np
    data = [1, 2, 3, 4]
    dt = np.dtype('int8', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from an int16 array

    import numpy as np
    data = [1, 2, 3, 4]
    dt = np.dtype('int16', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from an int32 array

    import numpy as np
    data = [1, 2, 3, 4]
    dt = np.dtype('int32', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from an int64 array

    import numpy as np
    data = [1, 2, 3, 4]
    dt = np.dtype('int64', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from a float array

    import numpy as np
    data = [0.1, 0.2, 0.3, 0.4]
    dt = np.dtype('float32', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from a double float array

    import numpy as np
    data = [0.1, 0.2, 0.3, 0.4]
    dt = np.dtype('float64', (len(data)))
    arr = np.array(data, dtype=dt)
    np.save(file_path, arr)
    
  • Create a NumPy file from a VARCHAR array

    import numpy as np
    data = ["a", "b", "c", "d"]
    arr = np.array(data)
    np.save(file_path, arr)
    
  • Create a NumPy file from a binary vector array

    For binary vectors, use uint8 as the NumPy data type. Each uint8 value represents 8 dimensions. For a 32-dimensional binary vector, use four uint8 values.

    import numpy as np
    data = [
        [43, 35, 124, 90],
        [65, 212, 12, 57],
        [6, 126, 232, 78],
        [87, 189, 38, 22],
    ]
    dt = np.dtype('uint8', (len(data), 4))
    arr = np.array(data)
    np.save(file_path, arr)
    
  • Create a NumPy file from a float vector array

    In Milvus, you can use either float32 or float64 values to form a float vector.

    The following snippet creates a NumPy file from an 8-dimensional vector array formed using float32 values.

    import numpy as np
    data = [
        [1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8],
        [2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8],
        [3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8],
        [4.1, 4.2, 4.3, 4.4, 4.5, 4.6, 4.7, 4.8],
    ]
    dt = np.dtype('float32', (len(data), 8))
    arr = np.array(data)
    np.save(file_path, arr)
    

Import multiple NumPy files in parallel

You can upload NumPy files into different subdirectories, create multiple import tasks, and execute them in parallel.

Assume the data structure is as follows:

├── task_1
│    └── book_id.npy
│    └── word_count.npy
│    └── book_intro.npy
│    └── book_props.npy
├── task_2
│    └── book_id.npy
│    └── word_count.npy
│    └── book_intro.npy
│    └── book_props.npy

You can create multiple data-import tasks as follows

task_1 = utility.do_bulk_insert(
    collection_name="book",
    files=["task_1/book_id.npy", "task_1/word_count.npy", "task_1/book_intro.npy", "task_1/book_props.npy"]
)
task_2 = utility.do_bulk_insert(
    collection_name="book",
    files=["task_2/book_id.npy", "task_2/word_count.npy", "task_2/book_intro.npy", "task_2/book_props.npy"]
)

Check data searchability

After a data-import task is complete, Milvus persists the imported data into segments and sends these segments to the index nodes for index-building. During the index-building process, these segments are unavailable for searches. Once such a process is complete, you need to call the load API again to load these segments into the query nodes. These segments will then be ready for searches.

  1. Check the index-building progress

PyMilvus provides a utility method to wait for the index-building process to complete.

utility.wait_for_index_building_complete(collection_name)

In other SDKs, you can use the describe-index API to check the index-building progress.

while (true) {
    R<DescribeIndexResponse> response = milvusClient.describeIndex(
        DescribeIndexParam.newBuilder()
            .withCollectionName(collection_name)
            .withIndexName(index_name)
            .build());
    IndexDescription desc = response.getData().getIndexDescriptions(0);
    if (desc.getIndexedRows() == desc.getTotalRows()) {
        break;
    }
}
  1. Load new segments into query nodes

Newly indexed segments need to be loaded manually as follows:

collection.load(_refresh = True)
R<RpcStatus> response = milvusClient.loadCollection(
    LoadCollectionParam.newBuilder()
        .withCollectionName(collection_name)
        .withRefresh(Boolean.TRUE)
        .build());

The _refresh parameter is False by default. Do not set it to True when you load a collection for the first time.

The withRefresh() method is optional. Do not call it with a Boolean.TRUE when you load a collection for the first time.

What's next

Learn more basic operations of Milvus:

On this page