Importar dados
Esta página demonstra o procedimento para importar os dados preparados.
Antes de começar
Já preparou os seus dados e colocou-os no balde do Milvus.
Caso contrário, deve utilizar RemoteBulkWriter para preparar os seus dados primeiro e garantir que os dados preparados já foram transferidos para o bucket Milvus na instância MinIO iniciada juntamente com a sua instância Milvus. Para obter detalhes, consulte Preparar dados de origem.
Já criou uma coleção com o esquema que utiliza para preparar os seus dados. Caso contrário, consulte Gerenciar coleções.
O seguinte trecho de código cria uma coleção simples com o esquema fornecido. Para obter mais informações sobre parâmetros, consulte create_schema()
e create_collection()
na referência do SDK.
O seguinte trecho de código cria uma coleção simples com o esquema fornecido. Para obter mais informações sobre os parâmetros, consulte createCollection()
na referência do SDK.
Importar dados
Para importar os dados preparados, tem de criar um trabalho de importação da seguinte forma:
from pymilvus.bulk_writer import bulk_import
url = f"http://127.0.0.1:19530"
# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
url=url,
collection_name="quick_setup",
files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)
job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
.collectionName("quick_setup")
.files(batchFiles)
.build();
String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
"files": [
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
],
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
]
],
"collectionName": "quick_setup"
}'
O corpo do pedido contém dois campos:
collectionName
O nome da coleção de destino.
files
Uma lista de listas de caminhos de ficheiros relativos ao caminho de raiz do balde Milvus na instância MioIO iniciada juntamente com a sua instância Milvus. As sub-listas possíveis são as seguintes:
Ficheiros JSON
Se o ficheiro preparado estiver no formato JSON, cada sub-lista deve conter o caminho para um único ficheiro JSON preparado.
[ "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json" ],
Ficheiros Parquet
Se o ficheiro preparado estiver no formato Parquet, cada sub-lista deve conter o caminho para um único ficheiro Parquet preparado.
[ "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet" ]
O retorno possível é o seguinte:
{
"code": 200,
"data": {
"jobId": "448707763884413158"
}
}
Verificar o progresso da importação
Depois de obter um ID de tarefa de importação, pode verificar o progresso da importação da seguinte forma:
import json
from pymilvus.bulk_writer import get_import_progress
url = f"http://127.0.0.1:19530"
# Get bulk-insert job progress
resp = get_import_progress(
url=url,
job_id="453265736269038336",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
break;
}
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);
JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
if ("Failed".equals(state)) {
String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
break;
} else if ("Completed".equals(state)) {
System.out.printf("The job %s completed%n", jobId);
break;
} else {
System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
}
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
"jobId": "449839014328146739"
}'
A resposta possível é a seguinte:
{
"code": 200,
"data": {
"collectionName": "quick_setup",
"completeTime": "2024-05-18T02:57:13Z",
"details": [
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
"fileSize": 31567874,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 100000
},
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
"fileSize": 31517224,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
],
"fileSize": 63085098,
"importedRows": 200000,
"jobId": "449839014328146739",
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
}
Listar tarefas de importação
Pode listar todas as tarefas de importação relativas a uma coleção específica da seguinte forma:
import json
from pymilvus.bulk_writer import list_import_jobs
url = f"http://127.0.0.1:19530"
# List bulk-insert jobs
resp = list_import_jobs(
url=url,
collection_name="quick_setup",
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
"collectionName": "quick_setup"
}'
Os valores possíveis são os seguintes:
{
"code": 200,
"data": {
"records": [
{
"collectionName": "quick_setup",
"jobId": "448761313698322011",
"progress": 50,
"state": "Importing"
}
]
}
}
Limitações
O tamanho de cada ficheiro de importação não deve exceder 16 GB.
O número máximo de pedidos de importação está limitado a 1024.
O número máximo de ficheiros por pedido de importação não deve exceder 1024.
Só pode ser especificado um nome de partição num pedido de importação. Se não for especificado nenhum nome de partição, os dados serão inseridos na partição predefinida. Além disso, não pode definir um nome de partição no pedido de importação se tiver definido a Chave de partição na coleção de destino.
Restrições
Antes de importar os dados, certifique-se de que reconheceu as restrições em termos dos seguintes comportamentos do Milvus:
Restrições relativas ao comportamento Load:
- Se uma coleção já tiver sido carregada antes de uma importação, pode utilizar a função
refresh_load
para carregar os dados recém-importados após a conclusão da importação.
- Se uma coleção já tiver sido carregada antes de uma importação, pode utilizar a função
Restrições relativas aos comportamentos de consulta e pesquisa:
Antes de o estado da tarefa de importação ser Concluído, é garantido que os dados recentemente importados são invisíveis para consultas e pesquisas.
Quando o estado da tarefa for Concluído,
Se a coleção não estiver carregada, pode utilizar a função
load
para carregar os dados recentemente importados.Se a coleção já estiver carregada, pode chamar
load(is_refresh=True)
para carregar os dados importados.
Restrições relativas ao comportamento de eliminação:
Antes de o estado da tarefa de importação ser Concluído, a eliminação não é garantida e pode ou não ser bem sucedida.
A eliminação depois de o estado da tarefa ser Concluído é garantida.
Recomendações
Recomendamos vivamente a utilização da funcionalidade de importação de vários ficheiros, que lhe permite carregar vários ficheiros num único pedido. Este método não só simplifica o processo de importação, como também aumenta significativamente o desempenho da importação. Entretanto, ao consolidar os seus carregamentos, pode reduzir o tempo gasto na gestão de dados e tornar o seu fluxo de trabalho mais eficiente.