Daten importieren
Auf dieser Seite wird das Verfahren zum Importieren der vorbereiteten Daten beschrieben.
Bevor Sie beginnen
Sie haben Ihre Daten bereits vorbereitet und in den Milvus-Bucket gelegt.
Falls nicht, sollten Sie zuerst RemoteBulkWriter verwenden, um Ihre Daten vorzubereiten, und sicherstellen, dass die vorbereiteten Daten bereits in den Milvus-Bucket der MinIO-Instanz übertragen wurden, die zusammen mit Ihrer Milvus-Instanz gestartet wurde. Details hierzu finden Sie unter Quelldaten vorbereiten.
Sie haben bereits eine Sammlung mit dem Schema erstellt, das Sie zur Vorbereitung Ihrer Daten verwenden. Falls nicht, lesen Sie bitte den Abschnitt Verwalten von Sammlungen.
Das folgende Code-Snippet erstellt eine einfache Sammlung mit dem angegebenen Schema. Weitere Informationen zu Parametern finden Sie unter create_schema()
und create_collection()
in der SDK-Referenz.
Mit dem folgenden Codeausschnitt wird eine einfache Sammlung mit dem angegebenen Schema erstellt. Weitere Informationen zu den Parametern finden Sie unter createCollection()
in der SDK-Referenz.
Daten importieren
Um die vorbereiteten Daten zu importieren, müssen Sie einen Importauftrag wie folgt erstellen:
from pymilvus.bulk_writer import bulk_import
url = f"http://127.0.0.1:19530"
# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
url=url,
collection_name="quick_setup",
files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)
job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
.collectionName("quick_setup")
.files(batchFiles)
.build();
String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
"files": [
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
],
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
]
],
"collectionName": "quick_setup"
}'
Der Anfragekörper enthält zwei Felder:
collectionName
Den Namen der Zielsammlung.
files
Eine Liste mit Listen von Dateipfaden relativ zum Stammverzeichnis des Milvus-Buckets auf der MioIO-Instanz, die zusammen mit Ihrer Milvus-Instanz gestartet wurde. Mögliche Unterlisten sind folgende:
JSON-Dateien
Wenn die vorbereitete Datei im JSON-Format vorliegt, sollte jede Unterliste den Pfad zu einer einzelnen vorbereiteten JSON-Datei enthalten.
[ "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json" ],
Parquet-Dateien
Wenn die vorbereitete Datei im Parquet-Format vorliegt, sollte jede Teilliste den Pfad zu einer einzelnen vorbereiteten Parquet-Datei enthalten.
[ "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet" ]
Die mögliche Rückgabe ist wie folgt:
{
"code": 200,
"data": {
"jobId": "448707763884413158"
}
}
Importfortschritt prüfen
Sobald Sie eine Importauftrags-ID erhalten haben, können Sie den Importfortschritt wie folgt überprüfen:
import json
from pymilvus.bulk_writer import get_import_progress
url = f"http://127.0.0.1:19530"
# Get bulk-insert job progress
resp = get_import_progress(
url=url,
job_id="453265736269038336",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
break;
}
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);
JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
if ("Failed".equals(state)) {
String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
break;
} else if ("Completed".equals(state)) {
System.out.printf("The job %s completed%n", jobId);
break;
} else {
System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
}
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
"jobId": "449839014328146739"
}'
Die mögliche Antwort lautet wie folgt:
{
"code": 200,
"data": {
"collectionName": "quick_setup",
"completeTime": "2024-05-18T02:57:13Z",
"details": [
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
"fileSize": 31567874,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 100000
},
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
"fileSize": 31517224,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
],
"fileSize": 63085098,
"importedRows": 200000,
"jobId": "449839014328146739",
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
}
Importaufträge auflisten
Sie können alle Importaufträge in Bezug auf eine bestimmte Sammlung wie folgt auflisten:
import json
from pymilvus.bulk_writer import list_import_jobs
url = f"http://127.0.0.1:19530"
# List bulk-insert jobs
resp = list_import_jobs(
url=url,
collection_name="quick_setup",
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
"collectionName": "quick_setup"
}'
Die möglichen Werte sind wie folgt:
{
"code": 200,
"data": {
"records": [
{
"collectionName": "quick_setup",
"jobId": "448761313698322011",
"progress": 50,
"state": "Importing"
}
]
}
}
Beschränkungen
Die Größe jeder Importdatei sollte 16 GB nicht überschreiten.
Die maximale Anzahl der Importaufträge ist auf 1024 begrenzt.
Die maximale Anzahl von Dateien pro Importauftrag sollte 1024 nicht überschreiten.
In einem Importauftrag kann nur ein Partitionsname angegeben werden. Wenn kein Partitionsname angegeben wird, werden die Daten in die Standardpartition eingefügt. Außerdem können Sie keinen Partitionsnamen im Importauftrag angeben, wenn Sie den Partitionsschlüssel in der Zielsammlung festgelegt haben.
Constraints
Bevor Sie Daten importieren, vergewissern Sie sich, dass Sie die Beschränkungen in Bezug auf die folgenden Milvus-Verhaltensweisen beachtet haben:
Einschränkungen bezüglich des Ladeverhaltens:
- Wenn eine Sammlung bereits vor dem Import geladen wurde, können Sie die Funktion
refresh_load
verwenden, um die neu importierten Daten nach Abschluss des Imports zu laden.
- Wenn eine Sammlung bereits vor dem Import geladen wurde, können Sie die Funktion
Einschränkungen bezüglich des Abfrage- und Suchverhaltens:
Bevor der Importauftrag den Status Abgeschlossen hat, sind die neu importierten Daten für Abfragen und Suchen garantiert unsichtbar.
Sobald der Auftragsstatus Abgeschlossen ist,
Wenn die Sammlung nicht geladen ist, können Sie die Funktion
load
verwenden, um die neu importierten Daten zu laden.Wenn die Sammlung bereits geladen ist, können Sie
load(is_refresh=True)
aufrufen, um die importierten Daten zu laden.
Einschränkungen bezüglich des Löschverhaltens:
Bevor der Status des Importauftrags " Abgeschlossen" lautet, ist das Löschen nicht garantiert und kann erfolgreich sein oder nicht.
Das Löschen nach Beendigung des Auftrags ist garantiert erfolgreich.
Empfehlungen
Wir empfehlen dringend, die Funktion für den Import mehrerer Dateien zu verwenden, mit der Sie mehrere Dateien in einer einzigen Anfrage hochladen können. Diese Methode vereinfacht nicht nur den Importprozess, sondern steigert auch die Importleistung erheblich. Indem Sie Ihre Uploads konsolidieren, können Sie den Zeitaufwand für die Datenverwaltung verringern und Ihren Arbeitsablauf effizienter gestalten.