milvus-logo
LFAI
フロントページへ
  • ユーザーガイド

イテレータ

Milvusは大量のエンティティの結果を反復処理するための検索およびクエリのイテレータを提供します。MilvusはTopKを16384に制限しているため、ユーザはイテレータを使用してバッチモードでコレクション内の大量の、あるいはエンティティ全体を返すことができます。

概要

イテレータは、主キー値とブール式を使用して、コレクション内の大量のデータまたはすべてのデータを反復処理するのに役立つ強力なツールです。これにより、データの取得方法を大幅に改善できます。時間の経過とともに効率が低下する可能性のある、従来のoffsetパラメータやlimitパラメータの使用とは異なり、イテレータはよりスケーラブルなソリューションを提供します。

イテレータを使用する利点

  • シンプルさ:複雑なオフセットや リミットの設定が不要になります。

  • 効率性:必要なデータのみをフェッチすることで、スケーラブルなデータ検索を実現。

  • 一貫性:ブーリアンフィルターにより、一貫したデータセットサイズを保証します。

注釈

  • この機能はMilvus 2.3.x以降で利用可能です。

準備

以下のステップでは、Milvusに接続し、コレクションを素早くセットアップし、10,000以上のランダムに生成されたエンティティをコレクションに挿入するためのコードを再利用する。

ステップ1: コレクションの作成

コレクションを作成するには MilvusClientを使用してMilvusサーバに接続し create_collection()コレクションを作成します。

使用方法 MilvusClientV2を使ってMilvusサーバに接続し createCollection()コレクションを作成する。

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
    uri="http://localhost:19530"
)

# 2. Create a collection
client.create_collection(
    collection_name="quick_setup",
    dimension=5,
)
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;

String CLUSTER_ENDPOINT = "http://localhost:19530";

// 1. Connect to Milvus server
ConnectParam connectParam = ConnectParam.newBuilder()
        .withUri(CLUSTER_ENDPOINT)
        .build();

MilvusServiceClient client  = new MilvusServiceClient(connectParam);

// 2. Create a collection
CreateSimpleCollectionParam createCollectionParam = CreateSimpleCollectionParam.newBuilder()
        .withCollectionName("quick_setup")
        .withDimension(5)
        .build();

client.createCollection(createCollectionParam);

ステップ2: ランダムに生成されたエンティティの挿入

以下を使用する。 insert()を使ってエンティティをコレクションに挿入する。

コレクションにエンティティを挿入するには insert()を使って、エンティティをコレクションに挿入する。

# 3. Insert randomly generated vectors 
colors = ["green", "blue", "yellow", "red", "black", "white", "purple", "pink", "orange", "brown", "grey"]
data = []

for i in range(10000):
    current_color = random.choice(colors)
    current_tag = random.randint(1000, 9999)
    data.append({
        "id": i,
        "vector": [ random.uniform(-1, 1) for _ in range(5) ],
        "color": current_color,
        "tag": current_tag,
        "color_tag": f"{current_color}_{str(current_tag)}"
    })

print(data[0])

# Output
#
# {
#     "id": 0,
#     "vector": [
#         -0.5705990742218152,
#         0.39844925120642083,
#         -0.8791287928610869,
#         0.024163154953680932,
#         0.6837669917169638
#     ],
#     "color": "purple",
#     "tag": 7774,
#     "color_tag": "purple_7774"
# }

res = client.insert(
    collection_name="quick_setup",
    data=data,
)

print(res)

# Output
#
# {
#     "insert_count": 10000,
#     "ids": [
#         0,
#         1,
#         2,
#         3,
#         4,
#         5,
#         6,
#         7,
#         8,
#         9,
#         "(9990 more items hidden)"
#     ]
# }
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import com.alibaba.fastjson.JSONObject;

import io.milvus.param.R;
import io.milvus.param.dml.InsertParam;
import io.milvus.response.MutationResultWrapper;
import io.milvus.grpc.MutationResult;


// 3. Insert randomly generated vectors into the collection
List<String> colors = Arrays.asList("green", "blue", "yellow", "red", "black", "white", "purple", "pink", "orange", "brown", "grey");
List<JSONObject> data = new ArrayList<>();

for (int i=0; i<10000; i++) {
    Random rand = new Random();
    String current_color = colors.get(rand.nextInt(colors.size()-1));
    JSONObject row = new JSONObject();
    row.put("id", Long.valueOf(i));
    row.put("vector", Arrays.asList(rand.nextFloat(), rand.nextFloat(), rand.nextFloat(), rand.nextFloat(), rand.nextFloat()));
    row.put("color_tag", current_color + "_" + String.valueOf(rand.nextInt(8999) + 1000));
    data.add(row);
}

InsertParam insertParam = InsertParam.newBuilder()
    .withCollectionName("quick_setup")
    .withRows(data)
    .build();

R<MutationResult> insertRes = client.insert(insertParam);

if (insertRes.getStatus() != R.Status.Success.getCode()) {
    System.err.println(insertRes.getMessage());
}

MutationResultWrapper wrapper = new MutationResultWrapper(insertRes.getData());
System.out.println(wrapper.getInsertCount());

イテレータを使った検索

イテレータは類似検索をよりスケーラブルにします。

イテレータで検索するにはsearch_iterator()メソッドを呼び出します:

イテレータで検索するには、searchIterator()メソッドを呼び出します:

  1. 検索イテレータを初期化して、検索パラメータと出力フィールドを定義します。

  2. 検索結果をページ分割するには、ループ内でnext()メソッドを使用します。

    • メソッドが空の配列を返した場合はループが終了し、それ以降のページは使用できなくなります。

    • すべての結果は、指定した出力フィールドを保持します。

  3. すべてのデータが取得されたら、手動でclose()メソッドを呼び出してイテレータを閉じます。

from pymilvus import Collection

# 4. Search with iterator
connections.connect(host="127.0.0.1", port=19530)
collection = Collection("quick_setup")

query_vectors = [[0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]]
search_params = {
    "metric_type": "IP",
    "params": {"nprobe": 10}
}

iterator = collection.search_iterator(
    data=query_vectors,
    anns_field="vector",
    batch_size=10,
    param=search_params,
    output_fields=["color_tag"],
    limit=3
)

results = []

while True:
    result = iterator.next()
    if not result:
        iterator.close()
        break
        
    results.extend(result)
    
    for hit in result:
        results.append(hit.to_dict())

print(results)

# Output
#
# [
#     {
#         "id": 1756,
#         "distance": 2.0642056465148926,
#         "entity": {
#             "color_tag": "black_9109"
#         }
#     },
#     {
#         "id": 6488,
#         "distance": 1.9437453746795654,
#         "entity": {
#             "color_tag": "purple_8164"
#         }
#     },
#     {
#         "id": 3338,
#         "distance": 1.9107104539871216,
#         "entity": {
#             "color_tag": "brown_8121"
#         }
#     }
# ]
import io.milvus.param.dml.QueryIteratorParam;
import io.milvus.param.dml.SearchIteratorParam;
import io.milvus.response.QueryResultsWrapper;
import io.milvus.orm.iterator.SearchIterator;

// 4. Search with iterators
SearchIteratorParam iteratorParam = SearchIteratorParam.newBuilder()
    .withCollectionName("quick_setup")
    .withVectorFieldName("vector")
    // Use withFloatVectors() in clusters compatible with Milvus 2.4.x
    .withVectors(Arrays.asList(0.3580376395471989f, -0.6023495712049978f, 0.18414012509913835f, -0.26286205330961354f, 0.9029438446296592f))
    .withBatchSize(10L)
    .withParams("{\"metric_type\": \"COSINE\", \"params\": {\"level\": 1}}")
    .build();
        

R<SearchIterator> searchIteratorRes = client.searchIterator(iteratorParam);

if (searchIteratorRes.getStatus() != R.Status.Success.getCode()) {
    System.err.println(searchIteratorRes.getMessage());
}

SearchIterator searchIterator = searchIteratorRes.getData();
List<QueryResultsWrapper.RowRecord> results = new ArrayList<>();

while (true) {
    List<QueryResultsWrapper.RowRecord> batchResults = searchIterator.next();
    if (batchResults.isEmpty()) {
        searchIterator.close();
        break;
    }
    for (QueryResultsWrapper.RowRecord rowRecord : batchResults) {
        results.add(rowRecord);
    }
}

System.out.println(results.size());
パラメータ 説明
data
Milvus は指定されたものに最も類似したベクトル埋め込みを検索します。
anns_field 現在のコレクション内のベクトルフィールドの名前。
batch_size next()
デフォルト値は1000です。適切な値に設定して、反復ごとに返すエンティティの数を制御します。
param この操作に固有のパラメータ設定。
  • metric_type:この操作に適用されるメトリック・タイプ。これは、上記で指定したベクトル・フィールドにインデックスを付けるときに使用するものと同じでなければならない。指定可能な値は、L2IPCOSINEJACCARDHAMMINGである。
  • params:追加パラメータ。詳細はsearch_iterator() を参照。
output_fields
デフォルト値はNone。指定しない場合は、プライマリ・フィールドのみが含まれます。
limit
デフォルト値は-1 で、一致するすべてのエンティティが返されます。
パラメータ 説明
withCollectionName コレクション名を設定します。コレクション名は空または NULL にはできません。
withVectorFieldName 対象のベクトル・フィールドを名前で設定します。フィールド名は空または NULL にはできません。
withVectors 対象ベクターを設定します。最大 16384 ベクトルまで指定可能。
withBatchSize next()
デフォルト値は1000 です。適切な値に設定して、反復ごとに返すエンティティの数を制御します。
withParams 検索のパラメータを JSON 形式で指定します。詳細については、searchIterator() を参照してください。

イテレータを使用したクエリ

イテレータを使用してクエリを実行するには、query_iterator()メソッドを呼び出します:

イテレータで検索するには、queryIterator()メソッドをコールします:

# 6. Query with iterator
iterator = collection.query_iterator(
    batch_size=10, # Controls the size of the return each time you call next()
    expr="color_tag like \"brown_8\"",
    output_fields=["color_tag"]
)

results = []

while True:
    result = iterator.next()
    if not result:
        iterator.close()
        break
        
    results.extend(result)
    
# 8. Check the search results
print(len(results))

print(results[:3])

# Output
#
# [
#     {
#         "color_tag": "brown_8785",
#         "id": 94
#     },
#     {
#         "color_tag": "brown_8568",
#         "id": 176
#     },
#     {
#         "color_tag": "brown_8721",
#         "id": 289
#     }
# ]
import io.milvus.param.dml.QueryIteratorParam;
import io.milvus.orm.iterator.QueryIterator;

// 5. Query with iterators

try {
    Files.write(Path.of("results.json"), JSON.toJSONString(new ArrayList<>()).getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception e) {
    // TODO: handle exception
    e.printStackTrace();
}

QueryIteratorParam queryIteratorParam = QueryIteratorParam.newBuilder()
    .withCollectionName("quick_setup")
    .withExpr("color_tag like \"brown_8%\"")
    .withBatchSize(50L)
    .addOutField("vector")
    .addOutField("color_tag")
    .build();

R<QueryIterator> queryIteratRes = client.queryIterator(queryIteratorParam);

if (queryIteratRes.getStatus() != R.Status.Success.getCode()) {
    System.err.println(queryIteratRes.getMessage());
}

QueryIterator queryIterator = queryIteratRes.getData();

while (true) {
    List<QueryResultsWrapper.RowRecord> batchResults = queryIterator.next();
    if (batchResults.isEmpty()) {
        queryIterator.close();
        break;
    }

    String jsonString = "";
    List<JSONObject> jsonObject = new ArrayList<>();
    try {
        jsonString = Files.readString(Path.of("results.json"));
        jsonObject = JSON.parseArray(jsonString).toJavaList(null);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    for (QueryResultsWrapper.RowRecord queryResult : batchResults) {
        JSONObject row = new JSONObject();
        row.put("id", queryResult.get("id"));
        row.put("vector", queryResult.get("vector"));
        row.put("color_tag", queryResult.get("color_tag"));
        jsonObject.add(row);
    }

    try {
        Files.write(Path.of("results.json"), JSON.toJSONString(jsonObject).getBytes(), StandardOpenOption.WRITE);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
パラメータ 説明
batch_size next()
デフォルト値は1000です。適切な値に設定して、反復ごとに返すエンティティの数を制御します。
expr
デフォルト値はNone で、スカラー・フィルタリングが無視されることを示す。スカラー・フィルタリング条件を構築するには、「Boolean Expression Rules」を参照してください。
output_fields
デフォルト値はNone です。指定しないままにすると、プライマリ・フィールドのみが含まれます。
limit
値の既定値は-1 で、一致するすべてのエンティティが返されます。
パラメータ 説明
withCollectionName コレクション名を設定します。コレクション名は空または NULL にはできません。
withExpr エンティティをクエリする式を設定します。スカラー・フィルタリング条件を構築するには、"Boolean Expression Rules" を参照してください。
withBatchSize next()
デフォルト値は1000 です。適切な値に設定して、反復ごとに返すエンティティの数を制御します。
addOutField 出力スカラー・フィールドを指定します(オプション)。

翻訳DeepLogo

フィードバック

このページは役に立ちましたか ?