milvus-logo
LFAI
Home
  • Guía del usuario

Importar datos

Esta página muestra el procedimiento para importar los datos preparados.

Antes de empezar

  • Ya ha preparado sus datos y los ha colocado en el cubo de Milvus.

    Si no es así, debería utilizar RemoteBulkWriter para preparar sus datos primero y asegurarse de que los datos preparados ya se han transferido al cubo de Milvus en la instancia MinIO iniciada junto con su instancia de Milvus. Para más detalles, consulte Preparar datos de origen.

  • Ya ha creado una colección con el esquema que utiliza para preparar sus datos. Si no es así, consulte Administrar colecciones.

El siguiente fragmento de código crea una colección simple con el esquema dado. Para obtener más información sobre los parámetros, consulte create_schema() y create_collection() en la referencia del SDK.

El siguiente fragmento de código crea una colección simple con el esquema dado. Para obtener más información sobre los parámetros, consulte createCollection() en la referencia del SDK.

Importar datos

Para importar los datos preparados, debe crear una tarea de importación como se indica a continuación:

from pymilvus.bulk_writer import bulk_import

url = f"http://127.0.0.1:19530"

# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
    url=url,
    collection_name="quick_setup",
    files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)

job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
    MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
            .collectionName("quick_setup")
            .files(batchFiles)
            .build();
    String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
    System.out.println(bulkImportResult);

    JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
    String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
    System.out.println("Create a bulkInert task, job id: " + jobId);
    return jobId;
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
    "files": [
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
        ],
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
        ]
    ],
    "collectionName": "quick_setup"
}'

El cuerpo de la solicitud contiene dos campos:

  • collectionName

    El nombre de la colección de destino.

  • files

    Una lista de rutas de archivos relativa a la ruta raíz del cubo de Milvus en la instancia de MioIO iniciada junto con su instancia de Milvus. Las posibles sublistas son las siguientes

    • Archivos JSON

      Si el archivo preparado está en formato JSON, cada sublista debe contener la ruta a un único archivo JSON preparado.

      [
          "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json"
      ],
      
    • Archivos Parquet

      Si el archivo preparado está en formato Parquet, cada sublista debe contener la ruta a un único archivo Parquet preparado.

      [
          "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet"
      ]
      
      

El posible retorno es el siguiente

{
    "code": 200,
    "data": {
        "jobId": "448707763884413158"
    }
}

Comprobar el progreso de la importación

Una vez que obtenga un ID de trabajo de importación, puede comprobar el progreso de la importación de la siguiente manera:

import json
from pymilvus.bulk_writer import get_import_progress

url = f"http://127.0.0.1:19530"

# Get bulk-insert job progress
resp = get_import_progress(
    url=url,
    job_id="453265736269038336",
)

print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
    while (true) {
        System.out.println("Wait 5 second to check bulkInsert job state...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            break;
        }

        MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
                .jobId(jobId)
                .build();
        String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);

        JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
        String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
        String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
        if ("Failed".equals(state)) {
            String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
            System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
            break;
        } else if ("Completed".equals(state)) {
            System.out.printf("The job %s completed%n", jobId);
            break;
        } else {
            System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
        }
    }
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
    getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
    "jobId": "449839014328146739"
}'

La posible respuesta es la siguiente

{
    "code": 200,
    "data": {
        "collectionName": "quick_setup",
        "completeTime": "2024-05-18T02:57:13Z",
        "details": [
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
                "fileSize": 31567874,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 100000
            },
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
                "fileSize": 31517224,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 200000            
            }
        ],
        "fileSize": 63085098,
        "importedRows": 200000,
        "jobId": "449839014328146739",
        "progress": 100,
        "state": "Completed",
        "totalRows": 200000
    }
}

Listar trabajos de importación

Puede listar todos los trabajos de importación relativos a una colección específica de la siguiente manera:

import json
from pymilvus.bulk_writer import list_import_jobs

url = f"http://127.0.0.1:19530"

# List bulk-insert jobs
resp = list_import_jobs(
    url=url,
    collection_name="quick_setup",
)

print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
    MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
    String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
    System.out.println(listImportJobsResult);
}

public static void main(String[] args) throws Exception {
    listImportJobs();
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
    "collectionName": "quick_setup"
}'

Los valores posibles son los siguientes:

{
    "code": 200,
    "data": {
        "records": [
            {
                "collectionName": "quick_setup",
                "jobId": "448761313698322011",
                "progress": 50,
                "state": "Importing"
            }
        ]
    }
}

Limitaciones

  • El tamaño de cada archivo de importación no debe superar los 16 GB.

  • El número máximo de solicitudes de importación está limitado a 1024.

  • El número máximo de ficheros por solicitud de importación no debe exceder de 1024.

  • Sólo se puede especificar un nombre de partición en una petición de importación. Si no se especifica ningún nombre de partición, los datos se insertarán en la partición por defecto. Además, no se puede establecer un nombre de partición en la petición de importación si se ha establecido la Clave de Partición en la colección destino.

Restricciones

Antes de importar datos, asegúrese de que ha reconocido las restricciones relativas a los siguientes comportamientos de Milvus:

  • Restricciones relativas al comportamiento Cargar:

    • Si una colección ya se ha cargado antes de una importación, puede utilizar la función refresh_load para cargar los datos recién importados una vez finalizada la importación.
  • Restricciones relativas a los comportamientos de consulta y búsqueda:

    • Antes de que el estado del trabajo de importación sea Completado, se garantiza que los datos recién importados sean invisibles para las consultas y búsquedas.

    • Una vez completado el trabajo,

      • Si la colección no está cargada, puede utilizar la función load para cargar los datos recién importados.

      • Si la colección ya está cargada, puede llamar a load(is_refresh=True) para cargar los datos importados.

  • Restricciones relativas al comportamiento de eliminación:

    • Antes de que el estado del trabajo de importación sea Completado, el borrado no está garantizado y puede o no tener éxito.

    • El borrado después de que el estado del trabajo sea Completado está garantizado.

Recomendaciones

Le recomendamos encarecidamente que utilice la función de importación de varios archivos, que le permite cargar varios archivos en una única solicitud. Este método no sólo simplifica el proceso de importación, sino que también aumenta significativamente el rendimiento de la importación. Mientras tanto, al consolidar sus cargas, puede reducir el tiempo dedicado a la gestión de datos y hacer que su flujo de trabajo sea más eficiente.

Traducido porDeepLogo

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

¿Fue útil esta página?