データのインポート
このページでは、準備したデータをインポートする手順を説明します。
始める前に
既にデータを準備し、Milvusバケットに入れている。
そうでない場合は、まずRemoteBulkWriterを使用してデータを準備し、準備したデータがMilvusインスタンスと共に起動したMinIOインスタンス上のMilvusバケットに転送済みであることを確認してください。詳細はソースデータの準備を参照してください。
データの準備に使用するスキーマでコレクションを作成済みである。そうでない場合は、「コレクションの管理」を参照してください。
以下のコード・スニペットは、指定されたスキーマで単純なコレクションを作成します。パラメータの詳細については create_schema()
および create_collection()
を参照してください。
次のコード・スニペットは、指定されたスキーマで単純なコレクションを作成します。パラメータの詳細については、SDKリファレンスの createCollection()
を参照してください。
データのインポート
準備したデータをインポートするには、以下のようにインポートジョブを作成する必要があります:
from pymilvus.bulk_writer import bulk_import
url = f"http://127.0.0.1:19530"
# Bulk-insert data from a set of JSON files already uploaded to the MinIO server
resp = bulk_import(
url=url,
collection_name="quick_setup",
files=[['a1e18323-a658-4d1b-95a7-9907a4391bcf/1.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/2.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/3.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/4.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/5.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/6.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/7.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/8.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/9.parquet'],
['a1e18323-a658-4d1b-95a7-9907a4391bcf/10.parquet']],
)
job_id = resp.json()['data']['jobId']
print(job_id)
private static String bulkImport(List<List<String>> batchFiles) throws InterruptedException {
MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
.collectionName("quick_setup")
.files(batchFiles)
.build();
String bulkImportResult = BulkImport.bulkImport("http://localhost:19530", milvusImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/create" \
--header "Content-Type: application/json" \
--data-raw '{
"files": [
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet"
],
[
"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet"
]
],
"collectionName": "quick_setup"
}'
リクエストボディには2つのフィールドがあります:
collectionName
ターゲットコレクションの名前。
files
Milvusインスタンスと共に起動されたMioIOインスタンス上のMilvusバケットのルートパスからの相対的なファイルパスのリスト。可能なサブリストは以下の通りです:
JSONファイル
準備するファイルがJSON形式である場合、各サブリストには準備するJSONファイル1つのパスが含まれる。
[ "/d1782fa1-6b65-4ff3-b05a-43a436342445/1.json" ],
Parquetファイル
準備されたファイルがParquet形式の場合、各サブリストは準備された1つのParquetファイルへのパスを含む必要があります。
[ "/a6fb2d1c-7b1b-427c-a8a3-178944e3b66d/1.parquet" ]
返り値は以下のようになります:
{
"code": 200,
"data": {
"jobId": "448707763884413158"
}
}
インポートの進行状況の確認
インポートジョブIDを取得したら、以下のようにインポートの進捗状況を確認できます:
import json
from pymilvus.bulk_writer import get_import_progress
url = f"http://127.0.0.1:19530"
# Get bulk-insert job progress
resp = get_import_progress(
url=url,
job_id="453265736269038336",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
break;
}
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress("http://localhost:19530", request);
JsonObject getImportProgressObject = new Gson().fromJson(getImportProgressResult, JsonObject.class);
String state = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
if ("Failed".equals(state)) {
String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
System.out.printf("The job %s failed, reason: %s%n", jobId, reason);
break;
} else if ("Completed".equals(state)) {
System.out.printf("The job %s completed%n", jobId);
break;
} else {
System.out.printf("The job %s is running, state:%s progress:%s%n", jobId, state, progress);
}
}
}
public static void main(String[] args) throws Exception {
List<List<String>> batchFiles = uploadData();
String jobId = bulkImport(batchFiles);
getImportProgress(jobId);
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/describe" \
--header "Content-Type: application/json" \
--data-raw '{
"jobId": "449839014328146739"
}'
可能な応答は以下のとおりである:
{
"code": 200,
"data": {
"collectionName": "quick_setup",
"completeTime": "2024-05-18T02:57:13Z",
"details": [
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146740 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/1.parquet\" ",
"fileSize": 31567874,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 100000
},
{
"completeTime": "2024-05-18T02:57:11Z",
"fileName": "id:449839014328146741 paths:\"/8ca44f28-47f7-40ba-9604-98918afe26d1/2.parquet\" ",
"fileSize": 31517224,
"importedRows": 100000,
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
],
"fileSize": 63085098,
"importedRows": 200000,
"jobId": "449839014328146739",
"progress": 100,
"state": "Completed",
"totalRows": 200000
}
}
インポートジョブの一覧表示
特定のコレクションに関連するすべてのインポートジョブを一覧表示するには、次のようにします:
import json
from pymilvus.bulk_writer import list_import_jobs
url = f"http://127.0.0.1:19530"
# List bulk-insert jobs
resp = list_import_jobs(
url=url,
collection_name="quick_setup",
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName("quick_setup").build();
String listImportJobsResult = BulkImport.listImportJobs("http://localhost:19530", listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}
export MILVUS_URI="localhost:19530"
curl --request POST "http://${MILVUS_URI}/v2/vectordb/jobs/import/list" \
--header "Content-Type: application/json" \
--data-raw '{
"collectionName": "quick_setup"
}'
可能な値は以下の通り:
{
"code": 200,
"data": {
"records": [
{
"collectionName": "quick_setup",
"jobId": "448761313698322011",
"progress": 50,
"state": "Importing"
}
]
}
}
制限事項
各インポートファイルのサイズは16GBを超えてはなりません。
インポート要求の最大数は1024に制限されています。
インポートリクエストあたりの最大ファイル数は1024を超えてはならない。
インポート要求で指定できるパーティション名は1つだけです。パーティション名が指定されていない場合、データはデフォルトのパーティションに挿入されます。また、ターゲットコレクションでパーティションキーを設定している場合、インポートリクエストでパーティション名を設定することはできません。
制約
データをインポートする前に、以下のMilvusビヘイビアに関する制約を確認してください:
ロード動作に関する制約:
- ロード動作に関する制約: インポート前にコレクションが既にロードされている場合、
refresh_load
関数を使用して、インポート完了後に新しくインポートされたデータをロードできます。
- ロード動作に関する制約: インポート前にコレクションが既にロードされている場合、
クエリおよび検索動作に関する制約:
インポート・ジョブ・ステータスが "Completed "になる前は、新しくインポートされたデータはクエリや検索から不可視であることが保証されます。
ジョブ・ステータスがCompletedになると、
コレクションがロードされていない場合、
load
関数を使用して新しくインポートされたデータをロードできます。コレクションが既にロードされている場合は、
load(is_refresh=True)
を呼び出して、インポートされたデータをロードできます。
削除動作に関する制約:
インポート・ジョブ・ステータスがCompletedになる前の削除は保証されず、成功する場合も成功しない場合もあります。
ジョブ・ステータスがCompletedになった後の削除は成功が保証されます。
推奨事項
1回のリクエストで複数のファイルをアップロードできるマルチファイルインポート機能の利用を強くお勧めします。この方法は、インポート処理を簡素化するだけでなく、インポートのパフォーマンスを大幅に向上させます。一方、アップロードを統合することで、データ管理に費やす時間を短縮し、ワークフローを効率化することができます。