🚀 免费试用 Zilliz Cloud,完全托管的 Milvus,体验 10 倍的性能提升!立即试用>

milvus-logo
LFAI

使用方法

  • Engineering
February 07, 2022
Lichen Wang

Milvus 2.0 具有统一的批处理和流处理功能以及云原生架构,在开发 DELETE 功能的过程中面临着比其前身更大的挑战。得益于其先进的存储-计算分解设计和灵活的发布/订阅机制,我们可以自豪地宣布,我们实现了这一目标。在 Milvus 2.0 中,你可以用主键删除给定 Collections 中的一个实体,这样被删除的实体就不会再出现在搜索或查询的结果中。

请注意,Milvus 中的 DELETE 操作指的是逻辑删除,而物理数据清理则发生在数据压缩过程中。逻辑删除不仅能大大提高受 I/O 速度限制的搜索性能,还有利于数据恢复。在时间旅行功能的帮助下,逻辑删除的数据仍然可以恢复。

使用方法

让我们先试试 Milvus 2.0 中的 DELETE 函数。(以下示例在 Milvus 2.0.0 上使用 PyMilvus 2.0.0)。

from pymilvus import connections, utility, Collection, DataType, FieldSchema, CollectionSchema
# Connect to Milvus
connections.connect(
    alias="default", 
    host='x.x.x.x', 
    port='19530'
)
# Create a collection with Strong Consistency level
pk_field = FieldSchema(
    name="id", 
    dtype=DataType.INT64, 
    is_primary=True, 
)
vector_field = FieldSchema(
    name="vector", 
    dtype=DataType.FLOAT_VECTOR, 
    dim=2
)
schema = CollectionSchema(
    fields=[pk_field, vector_field], 
    description="Test delete"
)
collection_name = "test_delete"
collection = Collection(
    name=collection_name, 
    schema=schema, 
    using='default', 
    shards_num=2,
    consistency_level="Strong"
)
# Insert randomly generated vectors
import random
data = [
    [i for i in range(100)],
    [[random.random() for _ in range(2)] for _ in range(100)],
]
collection.insert(data)
# Query to make sure the entities to delete exist
collection.load()
expr = "id in [2,4,6,8,10]"
pre_del_res = collection.query(
    expr,
    output_fields = ["id", "vector"]
)
print(pre_del_res)
# Delete the entities with the previous expression
collection.delete(expr)
# Query again to check if the deleted entities exist
post_del_res = collection.query(
    expr,
    output_fields = ["id", "vector"]
)
print(post_del_res)

执行

在 Milvus 实例中,数据节点主要负责将流数据(日志代理中的日志)打包为历史数据(日志快照)并自动刷新到对象存储中。查询节点在完整数据(即流数据和历史数据)上执行搜索请求。

为了充分利用集群中并行节点的数据写入能力,Milvus 采用了基于主键散列的分片策略,将写入操作平均分配给不同的工作节点。也就是说,代理会将实体的数据处理语言(DML)消息(即请求)路由到相同的数据节点和查询节点。这些信息通过 DML 通道发布,并由数据节点和查询节点分别消费,从而共同提供搜索和查询服务。

数据节点

收到数据 INSERT 消息后,数据节点会将数据插入一个不断增长的段,这是为接收内存中的流数据而创建的新段。如果数据行数或增长段的持续时间达到阈值,数据节点就会将其封存,以防止任何数据进入。然后,数据节点会将包含历史数据的密封段刷新到对象存储中。与此同时,数据节点会根据新数据的主键生成一个 Bloom 过滤器,并将其与密封段一起冲入对象存储区,同时将 Bloom 过滤器作为包含段统计信息的统计二进制日志(binlog)的一部分保存起来。

bloom 过滤器是一种概率数据结构,由一个长的二进制向量和一系列随机映射函数组成。它可用于测试某个元素是否是某个集合的成员,但可能会返回假阳性匹配。 --维基百科

当收到数据 DELETE 消息时,数据节点会缓冲相应分片中的所有 Bloom 过滤器,并将它们与消息中提供的主键进行匹配,以检索可能包含要删除的实体的所有分段(从增长的分段和封存的分段中)。在精确定位了相应的分段后,数据节点会将其缓冲到内存中,以生成记录删除操作的 Delta binlog,然后将这些 binlog 和分段一起刷新到对象存储中。

Data Node 数据节点

由于一个分区只分配一个 DML 通道,因此群集中添加的其他查询节点将无法订阅该 DML 通道。为确保所有查询节点都能收到 DELETE 消息,数据节点会过滤 DML-Channel 中的 DELETE 消息,并将其转发到 Delta-Channel 以通知所有查询节点删除操作符。

查询节点

从对象存储加载 Collections 时,查询节点首先会获取每个分片的检查点,该检查点标记了自上次刷新操作以来的 DML 操作。在检查点的基础上,查询节点会加载所有密封分段及其 Delta binlog 和 Bloom 过滤器。加载所有数据后,查询节点会订阅 DML 通道、Delta 通道和查询通道。

如果在 Collections 加载到内存后收到更多的数据 INSERT 消息,查询节点会首先根据消息精确定位增长的数据段,并更新内存中相应的 Bloom 过滤器,仅供查询使用。查询结束后,这些查询专用的 bloom 过滤器不会被刷新到对象存储中。

Query Node 查询节点

如上所述,只有一定数量的查询节点可以从 DML 通道接收 DELETE 信息,这意味着只有它们可以执行不断增长的 DELETE 请求。对于那些订阅了 DML-Channel 的查询节点来说,它们首先会过滤成长段中的 DELETE 消息,通过将提供的主键与成长段中的查询专用 bloom 过滤器进行匹配来定位实体,然后在相应的段中记录删除操作。

不能订阅 DML 通道的查询节点只能处理密封网段上的搜索或查询请求,因为它们只能订阅 Delta 通道,并接收数据节点转发的 DELETE 消息。查询节点从 Delta-Channel 收集到密封网段中的所有 DELETE 消息后,通过将提供的主键与密封网段的 Bloom 过滤器进行匹配来定位实体,然后在相应的网段中记录删除操作。

最终,在搜索或查询时,查询节点会根据删除记录生成一个比特集,以省略已删除的实体,并在所有段中搜索剩余的实体,而不管段的状态如何。最后但并非最不重要的一点是,一致性级别会影响已删除数据的可见性。在强一致性级别下(如前面的代码示例所示),删除实体后立即不可见。如果采用有界一致性级别,则会有几秒钟的延迟,删除的实体才会不可见。

下一步是什么?

在 2.0 新功能系列博客中,我们将介绍新功能的设计。阅读本系列博客的更多内容!

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started

Like the article? Spread the word

扩展阅读