milvus-logo
LFAI
Casa
  • Importazione dei dati

Importazione dei dati

Questa pagina illustra la procedura per importare i dati preparati.

Prima di iniziare

  • I dati sono già stati preparati e inseriti nel bucket Milvus.

    In caso contrario, è necessario utilizzare RemoteBulkWriter per preparare i dati e assicurarsi che i dati preparati siano già stati trasferiti al bucket Milvus sull'istanza MinIO avviata insieme all'istanza Milvus. Per maggiori dettagli, consultare la sezione Preparare i dati di origine.

  • È già stata creata una raccolta con lo schema utilizzato per preparare i dati. In caso contrario, consultare Gestione delle raccolte.

Il seguente frammento di codice crea una semplice raccolta con lo schema dato. Per ulteriori informazioni sui parametri, fare riferimento a create_schema() e create_collection() nel riferimento dell'SDK.

Il seguente frammento di codice crea una collezione semplice con lo schema dato. Per ulteriori informazioni sui parametri, fare riferimento a createCollection() nel riferimento all'SDK.

Importare i dati

Per importare i dati preparati, è necessario creare un lavoro di importazione come segue:

from pymilvus.bulk_writer import bulk_import

url = f"http://127.0.0.1:19530"

# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
    url=url,
    collection_name="quick_setup",
    files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
           ['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)

job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
    MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
            .collectionName("quick_setup")
            .files(batchFiles)
            .build();
    String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
    System.out.println(bulkImportResult);

    JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
    String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
    System.out.println("Create a bulkInert task, job id: " + jobId);
    return jobId;
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
    "files": [
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
        ],
        [
            "/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
        ]
    ],
    "collectionName": "quick_setup"
}'

Il corpo della richiesta contiene due campi:

  • collectionName

    Il nome della collezione di destinazione.

  • files

    Un elenco di percorsi di file relativi al percorso principale del bucket Milvus sull'istanza MioIO avviata insieme all'istanza Milvus. I possibili sottoelenchi sono i seguenti:

    • File JSON

      Se il file preparato è in formato JSON, ogni sottoelenco deve contenere il percorso di un singolo file JSON preparato.

      [
          "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json"
      ],
      
    • File Parquet

      Se il file preparato è in formato Parquet, ogni sottoelenco deve contenere il percorso di un singolo file Parquet preparato.

      [
          "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet"
      ]
      
      

Il possibile ritorno è il seguente:

{
    "code": 200,
    "data": {
        "jobId": "448707763884413158"
    }
}

Controllare l'avanzamento dell'importazione

Una volta ottenuto l'ID di un lavoro di importazione, è possibile verificare l'avanzamento dell'importazione come segue:

import json
from pymilvus.bulk_writer import get_import_progress

url = f"http://127.0.0.1:19530"

# Get bulk-insert job progress
resp = get_import_progress(
    url=url,
    job_id="453265736269038336",
)

print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
    while (true) {
        System.out.println("Wait 5 second to check bulkInsert job state...");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            break;
        }

        MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
                .jobId(jobId)
                .build();
        String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);

        JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
        String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
        String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
        if ("Failed".equals(state)) {
            String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
            System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
            break;
        } else if ("Completed".equals(state)) {
            System.out.printf("The job %s completed%n", jobId);
            break;
        } else {
            System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
        }
    }
}

public static void main(String[] args) throws Exception {
    List<List<String>> batchFiles = uploadData();
    String jobId = bulkImport(batchFiles);
    getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
    "jobId": "449839014328146739"
}'

La possibile risposta è la seguente:

{
    "code": 200,
    "data": {
        "collectionName": "quick_setup",
        "completeTime": "2024-05-18T02:57:13Z",
        "details": [
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
                "fileSize": 31567874,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 100000
            },
            {
                "completeTime": "2024-05-18T02:57:11Z",
                "fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
                "fileSize": 31517224,
                "importedRows": 100000,
                "progress": 100,
                "state": "Completed",
                "totalRows": 200000            
            }
        ],
        "fileSize": 63085098,
        "importedRows": 200000,
        "jobId": "449839014328146739",
        "progress": 100,
        "state": "Completed",
        "totalRows": 200000
    }
}

Elenco dei lavori di importazione

È possibile elencare tutti i lavori di importazione relativi a una collezione specifica come segue:

import json
from pymilvus.bulk_writer import list_import_jobs

url = f"http://127.0.0.1:19530"

# List bulk-insert jobs
resp = list_import_jobs(
    url=url,
    collection_name="quick_setup",
)

print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
    MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
    String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
    System.out.println(listImportJobsResult);
}

public static void main(String[] args) throws Exception {
    listImportJobs();
}
export MILVUS_URI="localhost:19530"

curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
    "collectionName": "quick_setup"
}'

I valori possibili sono i seguenti:

{
    "code": 200,
    "data": {
        "records": [
            {
                "collectionName": "quick_setup",
                "jobId": "448761313698322011",
                "progress": 50,
                "state": "Importing"
            }
        ]
    }
}

Limitazioni

  • La dimensione di ciascun file di importazione non deve superare i 16 GB.

  • Il numero massimo di richieste di importazione è limitato a 1024.

  • Il numero massimo di file per richiesta di importazione non deve superare i 1024.

  • In una richiesta di importazione è possibile specificare un solo nome di partizione. Se non viene specificato alcun nome di partizione, i dati verranno inseriti nella partizione predefinita. Inoltre, non è possibile impostare un nome di partizione nella richiesta di importazione se è stata impostata la chiave di partizione nella raccolta di destinazione.

Vincoli

Prima di importare i dati, assicurarsi di aver preso atto dei vincoli relativi ai seguenti comportamenti di Milvus:

  • Vincoli relativi al comportamento Load:

    • Se una collezione è già stata caricata prima di un'importazione, è possibile utilizzare la funzione refresh_load per caricare i dati appena importati al termine dell'importazione.
  • Vincoli relativi ai comportamenti di interrogazione e ricerca:

    • Prima che lo stato del lavoro di importazione sia Completato, i dati appena importati sono garantiti come invisibili alle query e alle ricerche.

    • Una volta che lo stato del lavoro è Completato,

      • Se la raccolta non è caricata, è possibile utilizzare la funzione load per caricare i dati appena importati.

      • Se la collezione è già caricata, si può chiamare load(is_refresh=True) per caricare i dati importati.

  • Vincoli relativi al comportamento di cancellazione:

    • Prima che lo stato del lavoro di importazione sia Completato, l'eliminazione non è garantita e può avere o meno successo.

    • L'eliminazione dopo che lo stato del lavoro è Completato è garantita.

Raccomandazioni

Si consiglia vivamente di utilizzare la funzione di importazione di più file, che consente di caricare più file in un'unica richiesta. Questo metodo non solo semplifica il processo di importazione, ma aumenta anche in modo significativo le prestazioni dell'importazione. Inoltre, consolidando i caricamenti, è possibile ridurre il tempo dedicato alla gestione dei dati e rendere più efficiente il flusso di lavoro.

Tradotto daDeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Questa pagina è stata utile?