milvus-logo
LFAI
Home
  • Guia do utilizador

Importar dados

Esta página demonstra o procedimento para importar os dados preparados.

Antes de começar

  • Já preparou os seus dados e colocou-os no balde do Milvus.

    Caso contrário, deve utilizar RemoteBulkWriter para preparar os seus dados primeiro e garantir que os dados preparados já foram transferidos para o bucket Milvus na instância MinIO iniciada juntamente com a sua instância Milvus. Para obter detalhes, consulte Preparar dados de origem.

  • Já criou uma coleção com o esquema que utiliza para preparar os seus dados. Caso contrário, consulte Gerenciar coleções.

O seguinte trecho de código cria uma coleção simples com o esquema fornecido. Para obter mais informações sobre parâmetros, consulte create_schema() e create_collection() na referência do SDK.

O seguinte trecho de código cria uma coleção simples com o esquema fornecido. Para obter mais informações sobre os parâmetros, consulte createCollection() na referência do SDK.

Importar dados

Para importar os dados preparados, tem de criar um trabalho de importação da seguinte forma:

from pymilvus.bulk_writer import bulk_import

url = f"http://127.0.0.1:19530"

# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
    url=url,
    collection_name="quick_setup",
    files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)

job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
    MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
            .collectionName("quick_setup")
            .files(batchFiles)
            .build();
    String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
    System.out.println(bulkImportResult);

    JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
    String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
    System.out.println("Create a bulkInert task, job id: " + jobId);
    return jobId;
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
    "files": [
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
        ],
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
        ]
    ],
    "collectionName": "quick_setup"
}'

O corpo do pedido contém dois campos:

  • collectionName

    O nome da coleção de destino.

  • files

    Uma lista de listas de caminhos de ficheiros relativos ao caminho de raiz do balde Milvus na instância MioIO iniciada juntamente com a sua instância Milvus. As sub-listas possíveis são as seguintes:

    • Ficheiros JSON

      Se o ficheiro preparado estiver no formato JSON, cada sub-lista deve conter o caminho para um único ficheiro JSON preparado.

      [
          "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json"
      ],
      
    • Ficheiros Parquet

      Se o ficheiro preparado estiver no formato Parquet, cada sub-lista deve conter o caminho para um único ficheiro Parquet preparado.

      [
          "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet"
      ]
      
      

O retorno possível é o seguinte:

{
    "code": 200,
    "data": {
        "jobId": "448707763884413158"
    }
}

Verificar o progresso da importação

Depois de obter um ID de tarefa de importação, pode verificar o progresso da importação da seguinte forma:

import json
from pymilvus.bulk_writer import get_import_progress

url = f"http://127.0.0.1:19530"

# Get bulk-insert job progress
resp = get_import_progress(
    url=url,
    job_id="453265736269038336",
)

print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
    while (true) {
        System.out.println("Wait 5 second to check bulkInsert job state...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            break;
        }

        MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
                .jobId(jobId)
                .build();
        String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);

        JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
        String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
        String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
        if ("Failed".equals(state)) {
            String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
            System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
            break;
        } else if ("Completed".equals(state)) {
            System.out.printf("The job %s completed%n", jobId);
            break;
        } else {
            System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
        }
    }
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
    getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
    "jobId": "449839014328146739"
}'

A resposta possível é a seguinte:

{
    "code": 200,
    "data": {
        "collectionName": "quick_setup",
        "completeTime": "2024-05-18T02:57:13Z",
        "details": [
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
                "fileSize": 31567874,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 100000
            },
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
                "fileSize": 31517224,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 200000            
            }
        ],
        "fileSize": 63085098,
        "importedRows": 200000,
        "jobId": "449839014328146739",
        "progress": 100,
        "state": "Completed",
        "totalRows": 200000
    }
}

Listar tarefas de importação

Pode listar todas as tarefas de importação relativas a uma coleção específica da seguinte forma:

import json
from pymilvus.bulk_writer import list_import_jobs

url = f"http://127.0.0.1:19530"

# List bulk-insert jobs
resp = list_import_jobs(
    url=url,
    collection_name="quick_setup",
)

print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
    MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
    String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
    System.out.println(listImportJobsResult);
}

public static void main(String[] args) throws Exception {
    listImportJobs();
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
    "collectionName": "quick_setup"
}'

Os valores possíveis são os seguintes:

{
    "code": 200,
    "data": {
        "records": [
            {
                "collectionName": "quick_setup",
                "jobId": "448761313698322011",
                "progress": 50,
                "state": "Importing"
            }
        ]
    }
}

Limitações

  • O tamanho de cada ficheiro de importação não deve exceder 16 GB.

  • O número máximo de pedidos de importação está limitado a 1024.

  • O número máximo de ficheiros por pedido de importação não deve exceder 1024.

  • Só pode ser especificado um nome de partição num pedido de importação. Se não for especificado nenhum nome de partição, os dados serão inseridos na partição predefinida. Além disso, não pode definir um nome de partição no pedido de importação se tiver definido a Chave de partição na coleção de destino.

Restrições

Antes de importar os dados, certifique-se de que reconheceu as restrições em termos dos seguintes comportamentos do Milvus:

  • Restrições relativas ao comportamento Load:

    • Se uma coleção já tiver sido carregada antes de uma importação, pode utilizar a função refresh_load para carregar os dados recém-importados após a conclusão da importação.
  • Restrições relativas aos comportamentos de consulta e pesquisa:

    • Antes de o estado da tarefa de importação ser Concluído, é garantido que os dados recentemente importados são invisíveis para consultas e pesquisas.

    • Quando o estado da tarefa for Concluído,

      • Se a coleção não estiver carregada, pode utilizar a função load para carregar os dados recentemente importados.

      • Se a coleção já estiver carregada, pode chamar load(is_refresh=True) para carregar os dados importados.

  • Restrições relativas ao comportamento de eliminação:

    • Antes de o estado da tarefa de importação ser Concluído, a eliminação não é garantida e pode ou não ser bem sucedida.

    • A eliminação depois de o estado da tarefa ser Concluído é garantida.

Recomendações

Recomendamos vivamente a utilização da funcionalidade de importação de vários ficheiros, que lhe permite carregar vários ficheiros num único pedido. Este método não só simplifica o processo de importação, como também aumenta significativamente o desempenho da importação. Entretanto, ao consolidar os seus carregamentos, pode reduzir o tempo gasto na gestão de dados e tornar o seu fluxo de trabalho mais eficiente.

Traduzido porDeepLogo

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Esta página foi útil?