🚀 Попробуйте Zilliz Cloud, полностью управляемый Milvus, бесплатно — ощутите 10-кратное увеличение производительности! Попробовать сейчас>

milvus-logo
LFAI
Главная
  • Импорт данных
  • Home
  • Docs
  • Импорт данных

  • Импорт данных

Импорт данных

На этой странице показана процедура импорта подготовленных данных.

Прежде чем начать

  • Вы уже подготовили данные и поместили их в ведро Milvus.

    Если нет, то сначала следует использовать RemoteBulkWriter для подготовки данных и убедиться, что подготовленные данные уже переданы в ведро Milvus на экземпляре MinIO, запущенном вместе с вашим экземпляром Milvus. Подробности см. в разделе Подготовка исходных данных.

  • Вы уже создали коллекцию со схемой, которую вы используете для подготовки данных. Если нет, обратитесь к разделу Управление коллекциями.

Следующий фрагмент кода создает простую коллекцию с заданной схемой. Для получения дополнительной информации о параметрах см. create_schema() и create_collection() в справочнике SDK.

Следующий фрагмент кода создает простую коллекцию с заданной схемой. Для получения дополнительной информации о параметрах см. createCollection() в справочнике SDK.

Импорт данных

Чтобы импортировать подготовленные данные, необходимо создать задание импорта следующим образом:

from pymilvus.bulk_writer import bulk_import

url = f"http://127.0.0.1:19530"

# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
    url=url,
    collection_name="quick_setup",
    files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)

job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
    MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
            .collectionName("quick_setup")
            .files(batchFiles)
            .build();
    String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
    System.out.println(bulkImportResult);

    JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
    String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
    System.out.println("Create a bulkInert task, job id: " + jobId);
    return jobId;
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
    "files": [
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
        ],
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
        ]
    ],
    "collectionName": "quick_setup"
}'

Тело запроса содержит два поля:

  • collectionName

    Имя целевой коллекции.

  • files

    Список списков путей к файлам относительно корневого пути ведра Milvus на экземпляре MioIO, запущенном вместе с вашим экземпляром Milvus. Возможные вложенные списки следующие:

    • Файлы JSON

      Если подготовленный файл имеет формат JSON, каждый подсписок должен содержать путь к одному подготовленному JSON-файлу.

      [
          "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json"
      ],
      
    • Паркетные файлы

      Если подготовленный файл имеет формат Parquet, каждый вложенный список должен содержать путь к одному подготовленному файлу parquet.

      [
          "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet"
      ]
      
      

Возможные варианты возврата следующие:

{
    "code": 200,
    "data": {
        "jobId": "448707763884413158"
    }
}

Проверить ход импорта

Получив идентификатор задания импорта, вы можете проверить ход импорта следующим образом:

import json
from pymilvus.bulk_writer import get_import_progress

url = f"http://127.0.0.1:19530"

# Get bulk-insert job progress
resp = get_import_progress(
    url=url,
    job_id="453265736269038336",
)

print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
    while (true) {
        System.out.println("Wait 5 second to check bulkInsert job state...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            break;
        }

        MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
                .jobId(jobId)
                .build();
        String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);

        JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
        String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
        String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
        if ("Failed".equals(state)) {
            String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
            System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
            break;
        } else if ("Completed".equals(state)) {
            System.out.printf("The job %s completed%n", jobId);
            break;
        } else {
            System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
        }
    }
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
    getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
    "jobId": "449839014328146739"
}'

Возможный ответ выглядит следующим образом:

{
    "code": 200,
    "data": {
        "collectionName": "quick_setup",
        "completeTime": "2024-05-18T02:57:13Z",
        "details": [
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
                "fileSize": 31567874,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 100000
            },
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
                "fileSize": 31517224,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 200000            
            }
        ],
        "fileSize": 63085098,
        "importedRows": 200000,
        "jobId": "449839014328146739",
        "progress": 100,
        "state": "Completed",
        "totalRows": 200000
    }
}

Список заданий импорта

Вы можете перечислить все задания импорта относительно определенной коллекции следующим образом:

import json
from pymilvus.bulk_writer import list_import_jobs

url = f"http://127.0.0.1:19530"

# List bulk-insert jobs
resp = list_import_jobs(
    url=url,
    collection_name="quick_setup",
)

print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
    MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
    String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
    System.out.println(listImportJobsResult);
}

public static void main(String[] args) throws Exception {
    listImportJobs();
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
    "collectionName": "quick_setup"
}'

Возможные значения следующие:

{
    "code": 200,
    "data": {
        "records": [
            {
                "collectionName": "quick_setup",
                "jobId": "448761313698322011",
                "progress": 50,
                "state": "Importing"
            }
        ]
    }
}

Ограничения

  • Размер каждого файла импорта не должен превышать 16 ГБ.

  • Максимальное количество запросов на импорт ограничено 1024.

  • Максимальное количество файлов в одном запросе на импорт не должно превышать 1024.

  • В запросе на импорт можно указать только одно имя раздела. Если имя раздела не указано, данные будут вставлены в раздел по умолчанию. Кроме того, в запросе на импорт нельзя задать имя раздела, если в целевой коллекции задан ключ раздела.

Ограничения

Перед импортом данных убедитесь, что вы приняли к сведению ограничения в отношении следующих поведений Milvus:

  • Ограничения, касающиеся поведения Load:

    • Если коллекция уже была загружена до импорта, вы можете использовать функцию refresh_load для загрузки новых импортированных данных после завершения импорта.
  • Ограничения, касающиеся поведения запросов и поиска:

    • До перехода задания импорта в состояние Completed новые импортированные данные гарантированно будут невидимы для запросов и поиска.

    • После завершения задания,

      • Если коллекция не загружена, можно использовать функцию load для загрузки новых импортированных данных.

      • Если коллекция уже загружена, можно вызвать функцию load(is_refresh=True) для загрузки импортированных данных.

  • Ограничения, касающиеся поведения при удалении:

    • До того как статус задания импорта станет Завершен, удаление не гарантируется и может быть успешным, а может и не быть.

    • Удаление после завершения задания гарантированно успешно.

Рекомендации

Мы настоятельно рекомендуем использовать функцию многофайлового импорта, которая позволяет загружать несколько файлов в одном запросе. Этот метод не только упрощает процесс импорта, но и значительно повышает его производительность. Кроме того, консолидация загружаемых файлов позволяет сократить время на управление данными и повысить эффективность рабочего процесса.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?