🚀 免费试用 Zilliz Cloud,完全托管的 Milvus,体验 10 倍的性能提升!立即试用>

milvus-logo
LFAI
  • Home
  • Blog
  • 向量数据库如何处理数据?

向量数据库如何处理数据?

  • Engineering
March 28, 2022
Zhenshan Cao

Cover image 封面图片

本文由曹振山撰写,倪安琪翻译。

在本系列博客的前两篇文章中,我们已经介绍了全球最先进的向量数据库 Milvus 的系统架构及其Python SDK 和 API

本篇博文主要通过深入 Milvus 系统,研究数据处理组件之间的交互,帮助大家了解 Milvus 是如何处理数据的。

下面列出了开始之前的一些有用资源。我们建议您先阅读这些资源,以便更好地理解本篇文章的主题。

MsgStream 接口

MsgStream 接口对 Milvus 中的数据处理至关重要。调用Start() 时,后台的例行程序将数据写入日志代理或从日志代理读取数据。调用Close() 时,例行程序停止。

MsgStream interface MsgStream 接口

MsgStream 既可以作为生产者,也可以作为消费者。AsProducer(channels []string) 接口将 MsgStream 定义为生产者,而AsConsumer(channels []string, subNamestring)则将其定义为消费者。channels 参数在两个接口中是共享的,用于定义向哪个(物理)通道写入数据或从哪个(物理)通道读取数据。

创建 Collections 时,可以指定集合中分片的数量。每个分片对应一个虚拟通道(vchannel)。因此,一个 Collection 可以有多个 vchannel。Milvus 会为日志代理中的每个 v 通道分配一个物理通道(pchannel)

Each virtual channel/shard corresponds to a physical channel. 每个虚拟通道/分区对应一个物理通道

Produce() 在 MsgStream 接口中,负责将数据写入日志代理中的 pchannel。数据写入有两种方式:

  • 单次写入:根据主键的哈希值将实体写入不同的分片(vchannel)。然后,这些实体流入日志代理中相应的 pchannel。
  • 广播式写入:实体被写入由参数channels 指定的所有 pchannels。

Consume() 这是一种阻塞式 API。如果指定的 pchannel 中没有可用数据,那么在 MsgStream 接口中调用 时,coroutine 将被阻塞。另一方面, 是一种非阻塞式 API,这意味着只有在指定的 pchannel 中存在现有数据时,coroutine 才会读取和处理数据。否则,coroutine 可以处理其他任务,不会在没有可用数据时被阻塞。Consume() Chan()

Seek() 是故障恢复方法。当启动一个新节点时,可以获取数据消耗记录,并通过调用 从中断的位置恢复数据消耗。Seek()

写入数据

写入不同 v 通道(碎片)的数据可以是插入信息或删除信息。这些 vchannels 也可称为 DmChannels(数据操作通道)。

不同的 Collections 可以在日志代理中共享相同的 pchannels。一个 Collections 可以有多个分片,因此也可以有多个相应的 vchannels。因此,同一 Collections 中的实体会流入日志代理中多个相应的 pchannels。因此,共享 pchannels 的好处是通过日志代理的高并发性提高吞吐量。

创建 Collections 时,不仅要指定分片的数量,还要决定日志代理中 vchannels 和 pchannels 之间的映射。

Write path in Milvus Milvus 中的写入路径

如上图所示,在写入路径中,代理通过 MsgStream 的AsProducer() 接口将数据写入日志代理。然后,数据节点消耗数据,再将消耗的数据转换并存储到对象存储中。存储路径是一种元信息,将由数据协调器记录在 etcd 中。

流程图

由于不同的 Collections 可能会在日志代理中共享相同的 pchannel,因此在消费数据时,数据节点或查询节点需要判断 pchannel 中的数据属于哪个 Collections。为了解决这个问题,我们在 Milvus 中引入了 flowgraph。它主要负责按照 Collection ID 过滤共享 pchannel 中的数据。因此,我们可以说,每个 flowgraph 处理的都是集合中相应分片(vchannel)中的数据流。

Flowgraph in write path 写入路径中的流程图

创建 MsgStream

写入数据时,会在以下两种情况下创建 MsgStream 对象:

  • 当代理收到数据插入请求时,它首先会尝试通过根协调器(root coordinator)获取 vchannel 和 pchannel 之间的映射。然后,代理创建一个 MsgStream 对象。

Scenario 1 场景 1

  • 数据节点启动并在 etcd 中读取通道的元信息时,创建 MsgStream 对象。

Scenario 2 场景 2

读取数据

Read path in Milvus Milvus 中的读取路径

读取数据的一般工作流程如上图所示。查询请求通过 DqRequestChannel 广播到查询节点。查询节点并行执行查询任务。来自查询节点的查询结果通过 gRPC 和代理汇总后返回给客户端。

仔细观察数据读取过程,我们可以看到代理将查询请求写入 DqRequestChannel。然后,查询节点通过订阅 DqRequestChannel 来消费消息。DqRequestChannel 中的每条信息都会被广播,以便所有订阅的查询节点都能接收到该信息。

当查询节点收到查询请求时,它们会对存储在密封段中的批量数据和动态插入 Milvus 并存储在增长段中的流数据进行本地查询。之后,查询节点需要汇总密封段和增长段中的查询结果。这些汇总结果通过 gRPC 传递给代理。

代理收集来自多个查询节点的所有结果,然后对它们进行聚合,得到最终结果。然后,代理将最终查询结果返回给客户端。由于每个查询请求及其对应的查询结果都有相同的唯一请求 ID,因此代理可以找出哪个查询结果对应哪个查询请求。

流程图

Flowgraph in read path 读取路径中的流程图

与写入路径类似,读取路径中也引入了流程图。Milvus 实现了统一的 Lambda 架构,集成了对增量数据和历史数据的处理。因此,查询节点也需要获取实时流数据。同样,读取路径中的 flowgraph 也会过滤和区分来自不同 Collections 的数据。

创建 MsgStream

Creating MsgStream object in read path 在读取路径中创建 MsgStream 对象

读取数据时,MsgStream 对象是在以下情况下创建的:

  • 在 Milvus 中,除非加载数据,否则无法读取数据。当代理接收到数据加载请求时,它会将请求发送给查询协调器,由查询协调器决定将分片分配给不同查询节点的方式。分配信息(即 v 通道名称以及 v 通道与相应 p 通道之间的映射)通过方法调用或 RPC(远程过程调用)发送到查询节点。随后,查询节点会创建相应的 MsgStream 对象来消耗数据。

DDL 操作符

DDL 是数据定义语言的缩写。对元数据的 DDL 操作可分为写入请求和读取请求。不过,在元数据处理过程中,这两类请求会被同等对待。

对元数据的读取请求包括

  • 查询 Collections Schema
  • 查询索引信息

写请求包括

  • 创建 Collections
  • 删除 Collections
  • 建立索引
  • 删除索引 更多

DDL 请求从客户端发送到代理,代理再按接收顺序将这些请求传递给根协调器,根协调器为每个 DDL 请求分配一个时间戳,并对请求进行动态检查。代理以串行方式处理每个请求,即一次处理一个 DDL 请求。代理在处理完前一个请求并从根协调器收到结果后,才会处理下一个请求。

DDL operations. DDL 操作符。

如上图所示,根协调器任务队列中有K 个 DDL 请求。任务队列中的 DDL 请求按根协调器收到的顺序排列。因此,ddl1 是发送给根协调器的第一个请求,而ddlK 是这批请求中的最后一个。根协调器按照时间顺序逐一处理这些请求。

在分布式系统中,代理与根协调器之间的通信是通过 gRPC 实现的。根协调器会记录已执行任务的最大时间戳值,以确保所有 DDL 请求都按时间顺序处理。

假设有两个独立的代理,即代理 1 和代理 2。它们都向同一个根协调器发送 DDL 请求。然而,一个问题是,较早的请求并不一定会在另一个代理随后收到的请求之前发送到根坐标。例如,在上图中,当DDL_K-1 从代理 1 发送到根协调器时,来自代理 2 的DDL_K 已经被根协调器接受并执行。根据根协调器的记录,此时已执行任务的最大时间戳值为K 。因此,为了不打断时间顺序,根协调器的任务队列将拒绝DDL_K-1 请求。但是,如果代理 2 在此时向根协调器发送请求DDL_K+5 ,则该请求将被任务队列接受,并在稍后根据其时间戳值执行。

建立索引

建立索引

收到客户端的索引建立请求后,代理首先会对请求进行静态检查,并将其发送给根协调器。然后,根协调器将这些索引构建请求持久化到元存储(etcd)中,并将请求发送给索引协调器(索引协调器)。

Building an index. 建立索引

如上图所示,当索引协调器从根协调器接收到建立索引的请求时,它首先会将任务持久化到元存储(etcd)中。索引构建任务的初始状态是Unissued 。索引协调器会记录每个索引节点的任务负载,并将入站任务发送到负载较低的索引节点。任务完成后,索引节点会将任务状态(FinishedFailed )写入元存储,即 Milvus 中的 etcd。然后,索引协调器将通过在 etcd 中查找来了解索引构建任务是成功还是失败。如果由于系统资源有限或索引节点掉线导致任务失败,索引协调员将重新触发整个流程,并将相同的任务分配给另一个索引节点。

放弃索引

此外,索引协调员还负责删除索引的请求。

Dropping an index. 丢弃索引

当根协调器收到来自客户端的丢弃索引请求时,它会首先将索引标记为 "丢弃",并将结果返回给客户端,同时通知索引协调器。然后,索引协调程序会使用IndexID 过滤所有索引任务,符合条件的任务会被丢弃。

索引协调程序的后台程序将逐步从对象存储(MinIO 和 S3)中删除所有标记为 "已丢弃 "的索引任务。这一过程涉及 recycleIndexFiles 接口。当所有相应的索引文件被删除后,被删除索引任务的元信息也会从元存储(etcd)中删除。

关于深入研究系列

随着 Milvus 2.0正式宣布全面上市,我们精心策划了本期 Milvus 深度剖析系列博客,对 Milvus 架构和源代码进行深入解读。本系列博客涉及的主题包括

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started

Like the article? Spread the word

扩展阅读