Importer des données
Cette page présente la procédure d'importation des données préparées.
Avant de commencer
Vous avez déjà préparé vos données et les avez placées dans le seau Milvus.
Si ce n'est pas le cas, vous devez d'abord utiliser RemoteBulkWriter pour préparer vos données et vous assurer que les données préparées ont déjà été transférées dans le seau Milvus sur l'instance MinIO démarrée en même temps que votre instance Milvus. Pour plus d'informations, reportez-vous à la section Préparation des données source.
Vous avez déjà créé une collection avec le schéma que vous utilisez pour préparer vos données. Si ce n'est pas le cas, reportez-vous à la section Gérer les collections.
L'extrait de code suivant crée une collection simple avec le schéma donné. Pour plus d'informations sur les paramètres, voir create_schema()
et create_collection()
dans la référence SDK.
L'extrait de code suivant crée une collection simple avec le schéma donné. Pour plus d'informations sur les paramètres, voir createCollection()
dans la référence du SDK.
Importer des données
Pour importer les données préparées, vous devez créer une tâche d'importation comme suit :
from pymilvus.bulk_writer import bulk_import
url = f"http://127.0.0.1:19530"
# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
url=url,
collection_name="quick_setup",
files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)
job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
.collectionName("quick_setup")
.files(batchFiles)
.build();
String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
"files": [
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
],
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
]
],
"collectionName": "quick_setup"
}'
Le corps de la requête contient deux champs :
collectionName
Le nom de la collection cible.
files
Une liste de listes de chemins d'accès aux fichiers par rapport au chemin d'accès à la base de données Milvus sur l'instance MioIO démarrée en même temps que votre instance Milvus. Les sous-listes possibles sont les suivantes
Fichiers JSON
Si le fichier préparé est au format JSON, chaque sous-liste doit contenir le chemin d'accès à un seul fichier JSON préparé.
[ "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json" ],
Fichiers Parquet
Si le fichier préparé est au format Parquet, chaque sous-liste doit contenir le chemin d'accès à un seul fichier Parquet préparé.
[ "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet" ]
Les résultats possibles sont les suivants :
{
"code": 200,
"data": {
"jobId": "448707763884413158"
}
}
Vérifier la progression de l'importation
Une fois que vous avez obtenu l'identifiant d'une tâche d'importation, vous pouvez vérifier la progression de l'importation de la manière suivante :
import json
from pymilvus.bulk_writer import get_import_progress
url = f"http://127.0.0.1:19530"
# Get bulk-insert job progress
resp = get_import_progress(
url=url,
job_id="453265736269038336",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
break;
}
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);
JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
if ("Failed".equals(state)) {
String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
break;
} else if ("Completed".equals(state)) {
System.out.printf("The job %s completed%n", jobId);
break;
} else {
System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
}
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
"jobId": "449839014328146739"
}'
La réponse possible est la suivante :
{
"code": 200,
"data": {
"collectionName": "quick_setup",
"completeTime": "2024-05-18T02:57:13Z",
"details": [
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
"fileSize": 31567874,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 100000
},
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
"fileSize": 31517224,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
],
"fileSize": 63085098,
"importedRows": 200000,
"jobId": "449839014328146739",
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
}
Lister les tâches d'importation
Vous pouvez dresser la liste de toutes les tâches d'importation relatives à une collection spécifique en procédant comme suit :
import json
from pymilvus.bulk_writer import list_import_jobs
url = f"http://127.0.0.1:19530"
# List bulk-insert jobs
resp = list_import_jobs(
url=url,
collection_name="quick_setup",
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
"collectionName": "quick_setup"
}'
Les valeurs possibles sont les suivantes :
{
"code": 200,
"data": {
"records": [
{
"collectionName": "quick_setup",
"jobId": "448761313698322011",
"progress": 50,
"state": "Importing"
}
]
}
}
Limitations
La taille de chaque fichier d'importation ne doit pas dépasser 16 Go.
Le nombre maximum de demandes d'importation est limité à 1024.
Le nombre maximum de fichiers par demande d'importation ne doit pas dépasser 1024.
Un seul nom de partition peut être spécifié dans une demande d'importation. Si aucun nom de partition n'est spécifié, les données seront insérées dans la partition par défaut. En outre, vous ne pouvez pas définir un nom de partition dans la demande d'importation si vous avez défini la clé de partition dans la collection cible.
Contraintes
Avant d'importer des données, assurez-vous d'avoir pris connaissance des contraintes relatives aux comportements Milvus suivants :
Contraintes concernant le comportement Charger :
- Si une collection a déjà été chargée avant une importation, vous pouvez utiliser la fonction
refresh_load
pour charger les données nouvellement importées une fois l'importation terminée.
- Si une collection a déjà été chargée avant une importation, vous pouvez utiliser la fonction
Contraintes concernant les comportements d'interrogation et de recherche :
Avant que l'importation ne soit terminée, les données nouvellement importées sont invisibles pour les requêtes et les recherches.
Une fois que l'importation est terminée,
Si la collection n'est pas chargée, vous pouvez utiliser la fonction
load
pour charger les données nouvellement importées.Si la collection est déjà chargée, vous pouvez appeler
load(is_refresh=True)
pour charger les données importées.
Contraintes concernant le comportement de suppression :
Avant que le statut de la tâche d'importation ne soit Terminé, la suppression n'est pas garantie et peut réussir ou échouer.
La suppression après l'état de la tâche est garantie.
Recommandations
Nous vous recommandons vivement d'utiliser la fonction d'importation de fichiers multiples, qui vous permet de télécharger plusieurs fichiers en une seule demande. Cette méthode permet non seulement de simplifier le processus d'importation, mais aussi d'améliorer considérablement les performances de l'importation. Par ailleurs, en consolidant vos téléchargements, vous pouvez réduire le temps consacré à la gestion des données et rendre votre flux de travail plus efficace.