milvus-logo
LFAI
首页
  • 用户指南

使用迭代器

Milvus 提供搜索和查询迭代器,用于迭代大量实体。由于 Milvus 将 TopK 限制在 16384,用户可以使用迭代器以批处理模式返回大量甚至整个 Collections 中的实体。

概述

迭代器是扫描整个 Collections 或通过指定主键值或过滤表达式迭代大量实体的有效工具。与带有偏移限制参数的搜索或查询调用相比,使用迭代器更高效、更可扩展。

使用迭代器的好处

  • 简单:消除了复杂的偏移限制设置。

  • 高效:只获取需要的数据,提供可扩展的数据检索。

  • 一致性通过布尔筛选器确保数据集大小一致。

注释

  • 此功能适用于 Milvus 2.3.x 或更高版本。

准备工作

以下准备步骤连接 Milvus 并将随机生成的实体插入 Collections。

步骤 1:创建 Collections

使用 MilvusClient连接到 Milvus 服务器,并使用 create_collection()来创建 Collections。

使用 MilvusClientV2连接到 Milvus 服务器,并使用 createCollection()创建 Collections。

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
    uri="http://localhost:19530"
)

# 2. Create a collection
client.create_collection(
    collection_name="quick_setup",
    dimension=5,
)
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.response.QueryResultsWrapper;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DropCollectionReq;
import io.milvus.v2.service.vector.request.*;
import io.milvus.v2.service.vector.request.data.FloatVec;
import io.milvus.v2.service.vector.response.InsertResp;
import io.milvus.v2.service.vector.response.QueryResp;

import java.util.*;

String CLUSTER_ENDPOINT = "http://localhost:19530";

// 1. Connect to Milvus server
ConnectParam connectParam = ConnectParam.newBuilder()
        .withUri(CLUSTER_ENDPOINT)
        .build();

MilvusServiceClient client  = new MilvusServiceClient(connectParam);

// 2. Create a collection
CreateCollectionReq quickSetupReq = CreateCollectionReq.builder()
        .collectionName("quick_setup")
        .dimension(5)
        .build();
client.createCollection(quickSetupReq);

第二步:插入随机生成的实体

使用 insert()将实体插入 Collections。

使用 insert()将实体插入 Collections。

# 3. Insert randomly generated vectors 
colors = ["green", "blue", "yellow", "red", "black", "white", "purple", "pink", "orange", "brown", "grey"]
data = []

for i in range(10000):
    current_color = random.choice(colors)
    current_tag = random.randint(1000, 9999)
    data.append({
        "id": i,
        "vector": [ random.uniform(-1, 1) for _ in range(5) ],
        "color": current_color,
        "tag": current_tag,
        "color_tag": f"{current_color}_{str(current_tag)}"
    })

print(data[0])

# Output
#
# {
#     "id": 0,
#     "vector": [
#         -0.5705990742218152,
#         0.39844925120642083,
#         -0.8791287928610869,
#         0.024163154953680932,
#         0.6837669917169638
#     ],
#     "color": "purple",
#     "tag": 7774,
#     "color_tag": "purple_7774"
# }

res = client.insert(
    collection_name="quick_setup",
    data=data,
)

print(res)

# Output
#
# {
#     "insert_count": 10000,
#     "ids": [
#         0,
#         1,
#         2,
#         3,
#         4,
#         5,
#         6,
#         7,
#         8,
#         9,
#         "(9990 more items hidden)"
#     ]
# }
// 3. Insert randomly generated vectors into the collection
List<String> colors = Arrays.asList("green", "blue", "yellow", "red", "black", "white", "purple", "pink", "orange", "brown", "grey");
List<JsonObject> data = new ArrayList<>();
Gson gson = new Gson();
for (int i=0; i<10000; i++) {
    Random rand = new Random();
    String current_color = colors.get(rand.nextInt(colors.size()-1));
    JsonObject row = new JsonObject();
    row.addProperty("id", (long) i);
    row.add("vector", gson.toJsonTree(Arrays.asList(rand.nextFloat(), rand.nextFloat(), rand.nextFloat(), rand.nextFloat(), rand.nextFloat())));
    row.addProperty("color_tag", current_color + "_" + (rand.nextInt(8999) + 1000));
    data.add(row);
}

InsertResp insertR = client.insert(InsertReq.builder()
        .collectionName("quick_setup")
        .data(data)
        .build());
System.out.println(insertR.getInsertCnt());

// Output
// 10000

使用迭代器搜索

迭代器使相似性搜索更具可扩展性。

要使用迭代器搜索,请调用search_iterator()方法:

要使用迭代器搜索,请调用searchIterator()方法:

  1. 初始化搜索迭代器,定义搜索参数和输出字段。

  2. 在循环中使用next()方法对搜索结果进行分页。

    • 如果该方法返回一个空数组,则循环结束,不再提供更多页面。

    • 所有结果都包含指定的输出字段。

  3. 检索完所有数据后,手动调用close()方法关闭迭代器。

from pymilvus import Collection,connections

# 4. Search with iterator
connections.connect(host="127.0.0.1", port=19530)
collection = Collection("quick_setup")

query_vectors = [[0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]]
search_params = {
    "metric_type": "IP",
    "params": {"nprobe": 10}
}

iterator = collection.search_iterator(
    data=query_vectors,
    anns_field="vector",
    batch_size=10,
    param=search_params,
    output_fields=["color_tag"],
    limit=300
)
# search 300 entities totally with 10 entities per page

results = []

while True:
    result = iterator.next()
    if not result:
        iterator.close()
        break
        
    results.extend(result)
    
    for hit in result:
        results.append(hit.to_dict())

print(results)

# Output
#
# [
#     {
#         "id": 1756,
#         "distance": 2.0642056465148926,
#         "entity": {
#             "color_tag": "black_9109"
#         }
#     },
#     {
#         "id": 6488,
#         "distance": 1.9437453746795654,
#         "entity": {
#             "color_tag": "purple_8164"
#         }
#     },
#     {
#         "id": 3338,
#         "distance": 1.9107104539871216,
#         "entity": {
#             "color_tag": "brown_8121"
#         }
#     }
# ]
// 4. Search with iterators
SearchIteratorReq iteratorReq = SearchIteratorReq.builder()
        .collectionName("quick_setup")
        .vectorFieldName("vector")
        .batchSize(10L)
        .vectors(Collections.singletonList(new FloatVec(Arrays.asList(0.3580376395471989f, -0.6023495712049978f, 0.18414012509913835f, -0.26286205330961354f, 0.9029438446296592f))))
        .params("{\"level\": 1}")
        .metricType(IndexParam.MetricType.COSINE)
        .outputFields(Collections.singletonList("color_tag"))
        .topK(300)
        .build();

SearchIterator searchIterator = client.searchIterator(iteratorReq);

List<QueryResultsWrapper.RowRecord> results = new ArrayList<>();
while (true) {
    List<QueryResultsWrapper.RowRecord> batchResults = searchIterator.next();
    if (batchResults.isEmpty()) {
        searchIterator.close();
        break;
    }

    results.addAll(batchResults);
}
System.out.println(results.size());

// Output
// 300
参数 描述
data 一个向量嵌入列表。
Milvus 会搜索与指定向量嵌入最相似的向量嵌入。
anns_field 当前 Collections 中的向量字段名称。
batch_size 每次在当前迭代器上调用next() 时要返回的实体数量。
默认值为1000。将其设置为适当的值,以控制每次迭代返回的实体数量。
param 该操作符特有的参数设置。
  • metric_type:应用于此操作的度量类型。应与上面指定的向量场索引时使用的类型相同。可能的值有L2IPCOSINEJACCARDHAMMING
  • params:附加参数。详情请参阅search_iterator()
output_fields 要包含在返回的每个实体中的字段名列表。
默认值为 "无"。如果未指定,则只包含主字段。
limit 要返回的实体总数。
默认值为-1,表示将返回所有匹配实体。
参数 说明
withCollectionName 设置 Collections 名称。Collection 名称不能为空或 null。
withVectorFieldName 按名称设置目标向量字段。字段名称不能为空或空。
withVectors 设置目标向量。最多允许 16384 个向量。
withBatchSize 每次在当前迭代器上调用next() 时返回的实体数量。
默认值为1000。将其设置为适当的值,以控制每次迭代返回的实体数量。
withParams 以 JSON 格式指定搜索参数。更多信息,请参阅searchIterator()

使用迭代器查询

要使用迭代器查询,请调用query_iterator()方法:

要使用迭代器搜索,请调用queryIterator()方法:

# 6. Query with iterator
iterator = collection.query_iterator(
    batch_size=10, # Controls the size of the return each time you call next()
    expr="color_tag like \"brown_8\"",
    output_fields=["color_tag"]
)

results = []

while True:
    result = iterator.next()
    if not result:
        iterator.close()
        break
        
    results.extend(result)
    
# 8. Check the search results
print(len(results))

print(results[:3])

# Output
#
# [
#     {
#         "color_tag": "brown_8785",
#         "id": 94
#     },
#     {
#         "color_tag": "brown_8568",
#         "id": 176
#     },
#     {
#         "color_tag": "brown_8721",
#         "id": 289
#     }
# ]
// 5. Query with iterators
QueryIterator queryIterator = client.queryIterator(QueryIteratorReq.builder()
        .collectionName("quick_setup")
        .expr("color_tag like \"brown_8%\"")
        .batchSize(50L)
        .outputFields(Arrays.asList("vector", "color_tag"))
        .build());

results.clear();
while (true) {
    List<QueryResultsWrapper.RowRecord> batchResults = queryIterator.next();
    if (batchResults.isEmpty()) {
        queryIterator.close();
        break;
    }

    results.addAll(batchResults);
}

System.out.println(results.subList(0, 3));

// Output
// [
//  [color_tag:brown_8975, vector:[0.93425006, 0.42161798, 0.1603949, 0.86406225, 0.30063087], id:104],
//  [color_tag:brown_8292, vector:[0.075261295, 0.51725155, 0.13842249, 0.13178307, 0.90713704], id:793],
//  [color_tag:brown_8763, vector:[0.80366623, 0.6534371, 0.6446101, 0.094082, 0.1318503], id:1157]
// ]

参数 说明
batch_size 每次在当前迭代器上调用next() 时要返回的实体数量。
该值默认为1000。将其设置为合适的值,以控制每次迭代返回的实体数。
expr 用于过滤匹配实体的标量过滤条件。
该值默认为 "无",表示忽略标量过滤。要创建标量过滤条件,请参阅布尔表达式规则
output_fields 要包含在返回的每个实体中的字段名列表。
该值默认为 "无"。如果未指定,则只包含主字段。
limit 要返回的实体总数。
默认值为-1,表示将返回所有匹配实体。
参数 说明
withCollectionName 设置 Collections 名称。Collection 名称不能为空或 null。
withExpr 设置查询实体的表达式。要建立标量过滤条件,请参阅布尔表达式规则
withBatchSize 每次在当前迭代器上调用next() 时要返回的实体数量。
默认值为1000。将其设置为适当的值,以控制每次迭代返回的实体数。
addOutField 指定输出标量字段(可选)。

翻译自DeepL

想要更快、更简单、更好用的 Milvus SaaS服务 ?

Zilliz Cloud是基于Milvus的全托管向量数据库,拥有更高性能,更易扩展,以及卓越性价比

免费试用 Zilliz Cloud
反馈

此页对您是否有帮助?