管理资源组
在 Milvus 中,您可以使用资源组将某些查询节点与其他节点物理隔离。本指南将向您介绍如何创建和管理自定义资源组,以及如何在资源组之间传输节点。
什么是资源组
一个资源组可以容纳 Milvus 集群中的多个或全部查询节点。如何在资源组之间分配查询节点,由您根据最合理的方式来决定。例如,在多集合场景中,可以为每个资源组分配适当数量的查询节点,并将集合加载到不同的资源组中,这样每个集合中的操作与其他集合中的操作在物理上是独立的。
请注意,Milvus 实例会维护一个默认资源组,用于在启动时容纳所有查询节点,并将其命名为__default_resource_group 。
从 2.4.1 版开始,Milvus 提供了声明式资源组 API,而旧的资源组 API 已被弃用。新的声明式 API 使用户能够实现惰性,从而更轻松地在云原生环境中进行二次开发。
资源组的概念
资源组由资源组 config 描述:
{
"requests": { "nodeNum": 1 },
"limits": { "nodeNum": 1 },
"transfer_from": [{ "resource_group": "rg1" }],
"transfer_to": [{ "resource_group": "rg2" }]
}
- requests属性指定了资源组必须满足的条件。
- limits属性指定资源组的最大限制。
- transfer_from和transfer_to属性分别描述了资源组应优先从哪些资源组获取资源,以及应向哪些资源组转移资源。
一旦资源组的配置发生变化,Milvus 将根据新的配置尽可能调整当前查询节点的资源,确保所有资源组最终满足以下条件:
.requests.nodeNum < nodeNumOfResourceGroup < .limits.nodeNum.
以下情况除外:
- 当 Milvus 集群中的查询节点数量不足(即
NumOfQueryNode < sum(.requests.nodeNum)
)时,总会有资源组没有足够的查询节点。 - 当 Milvus 集群中的 QueryNodes 数量过多时(即
NumOfQueryNode > sum(.limits.nodeNum)
),多余的 QueryNodes 总是会首先被放置在__default_resource_group中。
当然,如果集群中的 QueryNodes 数量发生变化,Milvus 会不断尝试调整以满足最终条件。因此,可以先应用资源组配置更改,然后再执行 QueryNode 扩展。
使用声明式 api 管理资源组
本页面上的所有代码示例都在 PyMilvus 2.4.5 中。运行这些示例之前,请升级您的 PyMilvus 安装。
创建资源组。
要创建资源组,请在连接 Milvus 实例后运行以下代码。以下代码段假定
default
是 Milvus 连接的别名。import pymilvus # A resource group name should be a string of 1 to 255 characters, starting with a letter or an underscore (_) and containing only numbers, letters, and underscores (_). name = "rg" node_num = 0 # create a resource group that exactly hold no query node. try: utility.create_resource_group(name, config=utility.ResourceGroupConfig( requests={"node_num": node_num}, limits={"node_num": node_num}, ), using='default') print(f"Succeeded in creating resource group {name}.") except Exception: print("Failed to create the resource group.")
列出资源组。
创建资源组后,就可以在资源组列表中看到它。
要查看 Milvus 实例中的资源组列表,请执行以下操作:
rgs = utility.list_resource_groups(using='default') print(f"Resource group list: {rgs}") # Resource group list: ['__default_resource_group', 'rg']
描述资源组。
您可以让 Milvus 以如下方式描述一个资源组:
info = utility.describe_resource_group(name, using="default") print(f"Resource group description: {info}") # Resource group description: # <name:"rg">, // string, rg name # <capacity:1>, // int, num_node which has been transfer to this rg # <num_available_node:0>, // int, available node_num, some node may shutdown # <num_loaded_replica:{}>, // map[string]int, from collection_name to loaded replica of each collecion in this rg # <num_outgoing_node:{}>, // map[string]int, from collection_name to outgoging accessed node num by replica loaded in this rg # <num_incoming_node:{}>. // map[string]int, from collection_name to incoming accessed node num by replica loaded in other rg
在资源组之间转移节点。
您可能会注意到,所描述的资源组还没有任何查询节点。将一些节点从默认资源组转移到您创建的资源组,如下所示:假设集群的__default_resource_group 中目前有 1 个查询节点,我们要将一个节点转移到创建的rg 中。
update_resource_groups
,确保多个配置更改的原子性,因此 Milvus 不会看到中间状态。source = '__default_resource_group' target = 'rg' expected_num_nodes_in_default = 0 expected_num_nodes_in_rg = 1 try: utility.update_resource_groups({ source: ResourceGroupConfig( requests={"node_num": expected_num_nodes_in_default}, limits={"node_num": expected_num_nodes_in_default}, ), target: ResourceGroupConfig( requests={"node_num": expected_num_nodes_in_rg}, limits={"node_num": expected_num_nodes_in_rg}, ) }, using="default") print(f"Succeeded in move 1 node(s) from {source} to {target}.") except Exception: print("Something went wrong while moving nodes.") # After a while, succeeded in moving 1 node(s) from __default_resource_group to rg.
将集合和分区加载到资源组。
一旦资源组中有了查询节点,就可以将集合加载到该资源组。下面的代码段假定已经存在名为
demo
的集合。from pymilvus import Collection collection = Collection('demo') # Milvus loads the collection to the default resource group. collection.load(replica_number=2) # Or, you can ask Milvus load the collection to the desired resource group. # make sure that query nodes num should be greater or equal to replica_number resource_groups = ['rg'] collection.load(replica_number=2, _resource_groups=resource_groups)
此外,您还可以将一个分区加载到一个资源组中,并将其副本分布到多个资源组中。以下代码假定已存在名为
Books
的集合,且该集合中有一个名为Novels
的分区。collection = Collection("Books") # Use the load method of a collection to load one of its partition collection.load(["Novels"], replica_number=2, _resource_groups=resource_groups) # Or, you can use the load method of a partition directly partition = Partition(collection, "Novels") partition.load(replica_number=2, _resource_groups=resource_groups)
请注意,
_resource_groups
是一个可选参数,如果不指定,Milvus 将把副本加载到默认资源组中的查询节点上。要让 Milus 在单独的资源组中加载集合的每个副本,请确保资源组的数量等于副本的数量。
在资源组之间传输副本。
Milvus 使用副本来实现分布在多个查询节点上的数据段之间的负载平衡。您可以按以下方式将某个资源组中的某些副本从一个集合转移到另一个集合:
source = '__default_resource_group' target = 'rg' collection_name = 'c' num_replicas = 1 try: utility.transfer_replica(source, target, collection_name, num_replicas, using="default") print(f"Succeeded in moving {num_node} replica(s) of {collection_name} from {source} to {target}.") except Exception: print("Something went wrong while moving replicas.") # Succeeded in moving 1 replica(s) of c from __default_resource_group to rg.
删除一个资源组。
您可以随时放弃一个没有查询节点的资源组 (
limits.node_num = 0
)。在本指南中,资源组rg
现在有一个查询节点。您需要先将资源组limits.node_num
的配置更改为零。try: utility.update_resource_groups({ "rg": utility.ResourceGroupConfig( requests={"node_num": 0}, limits={"node_num": 0}, ), }, using="default") utility.drop_resource_group("rg", using="default") print(f"Succeeded in dropping {source}.") except Exception: print(f"Something went wrong while dropping {source}.")
有关详细信息,请参阅pymilvus 中的相关示例。
管理集群扩展的良好做法
目前,Milvus 无法在云原生环境中独立扩展。不过,通过将声明式资源组 API与容器协调结合使用,Milvus 可以轻松实现 QueryNodes 的资源隔离和管理。 以下是在云环境中管理 QueryNodes 的良好实践:
默认情况下,Milvus 会创建一个__default_resource_group。该资源组不能删除,也是所有集合的默认加载资源组,多余的 QueryNodes 总是分配给它。因此,我们可以创建一个待定资源组来保存未使用的 QueryNode 资源,防止 QueryNode 资源被__default_resource_group 占用。
此外,如果我们严格执行
sum(.requests.nodeNum) <= queryNodeNum
这一约束,就能精确控制集群中 QueryNode 的分配。下面是一个设置示例:from pymilvus import utility from pymilvus.client.types import ResourceGroupConfig _PENDING_NODES_RESOURCE_GROUP="__pending_nodes" def init_cluster(node_num: int): print(f"Init cluster with {node_num} nodes, all nodes will be put in default resource group") # create a pending resource group, which can used to hold the pending nodes that do not hold any data. utility.create_resource_group(name=_PENDING_NODES_RESOURCE_GROUP, config=ResourceGroupConfig( requests={"node_num": 0}, # this resource group can hold 0 nodes, no data will be load on it. limits={"node_num": 10000}, # this resource group can hold at most 10000 nodes )) # update default resource group, which can used to hold the nodes that all initial node in it. utility.update_resource_groups({ "__default_resource_group": ResourceGroupConfig( requests={"node_num": node_num}, limits={"node_num": node_num}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority. transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority. )}) utility.create_resource_group(name="rg1", config=ResourceGroupConfig( requests={"node_num": 0}, limits={"node_num": 0}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], )) utility.create_resource_group(name="rg2", config=ResourceGroupConfig( requests={"node_num": 0}, limits={"node_num": 0}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], )) init_cluster(1)
使用上面的示例代码,我们创建了一个名为__pending_nodes的资源组,用于容纳更多的 QueryNodes。我们还创建了名为rg1和rg2 的两个特定于用户的资源组。此外,我们还确保另一个资源组优先恢复__pending_nodes 中丢失或多余的 QueryNodes。
集群扩展
假设我们有以下缩放功能:
def scale_to(node_num: int): # scale the querynode number in Milvus into node_num. pass
我们可以使用 API 将特定资源组扩展到指定数量的 QueryNodes,而不会影响任何其他资源组。
# scale rg1 into 3 nodes, rg2 into 1 nodes utility.update_resource_groups({ "rg1": ResourceGroupConfig( requests={"node_num": 3}, limits={"node_num": 3}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], ), "rg2": ResourceGroupConfig( requests={"node_num": 1}, limits={"node_num": 1}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], ), }) scale_to(5) # rg1 has 3 nodes, rg2 has 1 node, __default_resource_group has 1 node.
群集向内扩展
同样,我们也可以建立缩放规则,优先从__pending_nodes资源组中选择 QueryNodes。这一信息可通过
describe_resource_group
API 获取。实现扩展到指定资源组的目标。# scale rg1 from 3 nodes into 2 nodes utility.update_resource_groups({ "rg1": ResourceGroupConfig( requests={"node_num": 2}, limits={"node_num": 2}, transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], ), }) # rg1 has 2 nodes, rg2 has 1 node, __default_resource_group has 1 node, __pending_nodes has 1 node. scale_to(4) # scale the node in __pending_nodes
资源组如何与多个副本交互
- 单个集合的副本与资源组之间是 N 对 N 的关系。
- 当单个集合的多个副本加载到一个资源组时,该资源组的 QueryNodes 会平均分配给各个副本,确保每个副本的 QueryNodes 数量之差不超过 1。
下一步
要部署多租户 Milvus 实例,请阅读以下内容: