Comment les données sont-elles traitées dans une base de données vectorielle ?
Image de couverture
Cet article a été rédigé par Zhenshan Cao et transcrit par Angela Ni.
Dans les deux précédents articles de cette série de blogs, nous avons déjà abordé l'architecture du système Milvus, la base de données vectorielles la plus avancée au monde, ainsi que son SDK et son API Python.
Ce billet vise principalement à vous aider à comprendre comment les données sont traitées dans Milvus en allant en profondeur dans le système Milvus et en examinant l'interaction entre les composants de traitement des données.
Quelques ressources utiles avant de commencer sont listées ci-dessous. Nous vous recommandons de les lire d'abord pour mieux comprendre le sujet de ce billet.
- Plongée dans l'architecture de Milvus
- Modèle de données Milvus
- Le rôle et la fonction de chaque composant Milvus
- Traitement des données dans Milvus
Interface MsgStream
L'interface MsgStream est cruciale pour le traitement des données dans Milvus. Lorsque Start()
est appelé, la coroutine en arrière-plan écrit des données dans le log broker ou lit des données à partir de celui-ci. Lorsque Close()
est appelé, la coroutine s'arrête.
Interface MsgStream
Le MsgStream peut servir de producteur et de consommateur. L'interface AsProducer(channels []string)
définit le MsgStream en tant que producteur, tandis que l'interface AsConsumer(channels []string, subNamestring)
le définit en tant que consommateur. Le paramètre channels
est commun aux deux interfaces et sert à définir les canaux (physiques) dans lesquels les données doivent être écrites ou lues.
Le nombre d'unités dans une collection peut être spécifié lors de la création d'une collection. Chaque shard correspond à un canal virtuel (vchannel). Par conséquent, une collection peut avoir plusieurs canaux virtuels. Milvus attribue à chaque canal virtuel du courtier en journaux un canal physique (pchannel).
Chaque canal virtuel/shard correspond à un canal physique.
Produce()
L'interface MsgStream est chargée d'écrire des données dans les pchannels du log broker. Les données peuvent être écrites de deux manières :
- Écriture unique : les entités sont écrites dans différents shards (vchannel) par les valeurs de hachage des clés primaires. Ces entités sont ensuite transférées dans les canaux correspondants du courtier en journaux.
- Écriture par diffusion : les entités sont écrites dans tous les pchannels spécifiés par le paramètre
channels
.
Consume()
est un type d'API bloquante. S'il n'y a pas de données disponibles dans le canal p spécifié, la coroutine sera bloquée lorsque Consume()
est appelé dans l'interface MsgStream. En revanche, Chan()
est une API non bloquante, ce qui signifie que la coroutine lit et traite les données uniquement s'il existe des données dans le pchannel spécifié. Dans le cas contraire, la coroutine peut traiter d'autres tâches et ne sera pas bloquée lorsqu'il n'y a pas de données disponibles.
Seek()
est une méthode de reprise sur panne. Lorsqu'un nouveau nœud est démarré, l'enregistrement de la consommation de données peut être obtenu et la consommation de données peut reprendre là où elle a été interrompue en appelant Seek()
.
Écriture de données
Les données écrites dans différents canaux virtuels (shards) peuvent être des messages d'insertion ou de suppression. Ces canaux virtuels peuvent également être appelés canaux Dm (canaux de manipulation des données).
Différentes collections peuvent partager les mêmes pchannels dans le log broker. Une même collection peut avoir plusieurs unités de stockage (shards) et donc plusieurs canaux virtuels correspondants. Les entités d'une même collection s'écoulent donc dans plusieurs canaux p correspondants dans le gestionnaire de journaux. Par conséquent, le partage des canaux p présente l'avantage d'augmenter le volume du débit grâce à une forte concurrence dans le courtier en journaux.
Lors de la création d'une collection, non seulement le nombre de tessons est spécifié, mais le mappage entre les vchannels et les pchannels dans le log broker est également décidé.
Chemin d'écriture dans Milvus
Comme le montre l'illustration ci-dessus, dans le chemin d'écriture, les mandataires écrivent des données dans le courtier en journaux via l'interface AsProducer()
du MsgStream. Les nœuds de données consomment ensuite les données, puis convertissent et stockent les données consommées dans le stockage d'objets. Le chemin de stockage est un type de méta-information qui sera enregistré dans etcd par les coordinateurs de données.
Diagramme de flux
Étant donné que différentes collections peuvent partager les mêmes pchannels dans le log broker, lors de la consommation de données, les nœuds de données ou les nœuds de requête doivent juger à quelle collection les données d'un pchannel appartiennent. Pour résoudre ce problème, nous avons introduit le flowgraph dans Milvus. Il est principalement chargé de filtrer les données d'un canal partagé par ID de collection. Nous pouvons donc dire que chaque graphe de flux gère le flux de données dans un tesson correspondant (canal virtuel) d'une collection.
Graphe de flux dans le chemin d'écriture
Création de MsgStream
Lors de l'écriture de données, l'objet MsgStream est créé dans les deux scénarios suivants :
- Lorsque le proxy reçoit une demande d'insertion de données, il tente d'abord d'obtenir la correspondance entre les vchannels et les pchannels via le coordinateur racine (root coord). Il crée ensuite un objet MsgStream.
Scénario 1
- Lorsque le nœud de données démarre et lit les méta-informations des canaux dans etcd, l'objet MsgStream est créé.
Scénario 2
Lecture des données
Chemin de lecture dans Milvus
Le flux de travail général de la lecture des données est illustré dans l'image ci-dessus. Les requêtes sont diffusées via le canal DqRequestChannel aux nœuds de requête. Les nœuds d'interrogation exécutent les tâches d'interrogation en parallèle. Les résultats des requêtes des nœuds de requête passent par gRPC et le proxy agrège les résultats et les renvoie au client.
Pour examiner de plus près le processus de lecture des données, nous pouvons voir que le proxy écrit les demandes de requête dans le canal DqRequestChannel. Les nœuds de requête consomment ensuite les messages en s'abonnant au canal DqRequestChannel. Chaque message du canal DqRequestChannel est diffusé de manière à ce que tous les nœuds de requête abonnés puissent le recevoir.
Lorsque les nœuds d'interrogation reçoivent des demandes de renseignements, ils effectuent une interrogation locale sur les données par lots stockées dans des segments scellés et sur les données en continu qui sont insérées dynamiquement dans Milvus et stockées dans des segments croissants. Ensuite, les nœuds d'interrogation doivent agréger les résultats de l'interrogation dans les segments scellés et croissants. Ces résultats agrégés sont transmis au proxy via gRPC.
Le proxy collecte tous les résultats de plusieurs nœuds d'interrogation et les agrège pour obtenir les résultats finaux. Le proxy renvoie ensuite les résultats finaux de la requête au client. Étant donné que chaque demande de requête et ses résultats correspondants sont étiquetés par le même identifiant de requête unique, le proxy peut déterminer quels résultats de requête correspondent à quelle demande de requête.
Diagramme de flux
Diagramme de flux dans le chemin de lecture
Comme pour le chemin d'écriture, les diagrammes de flux sont également introduits dans le chemin de lecture. Milvus met en œuvre l'architecture Lambda unifiée, qui intègre le traitement des données incrémentielles et historiques. Par conséquent, les nœuds de requête doivent également obtenir des données de flux en temps réel. De même, les diagrammes de flux dans le chemin de lecture filtrent et différencient les données provenant de différentes collections.
Création d'un MsgStream
Création d'un objet MsgStream dans le chemin de lecture
Lors de la lecture des données, l'objet MsgStream est créé dans le scénario suivant :
- Dans Milvus, les données ne peuvent être lues que si elles sont chargées. Lorsque le proxy reçoit une demande de chargement de données, il l'envoie au coordinateur de requêtes, qui décide de la manière d'attribuer les fichiers à différents nœuds de requêtes. Les informations relatives à l'affectation (c'est-à-dire les noms des canaux virtuels et la correspondance entre les canaux virtuels et les canaux virtuels correspondants) sont envoyées aux nœuds d'interrogation par le biais d'un appel de méthode ou d'un appel de procédure à distance (RPC). Ensuite, les nœuds d'interrogation créent les objets MsgStream correspondants pour consommer les données.
Opérations DDL
DDL est l'abréviation de Data Definition Language (langage de définition des données). Les opérations DDL sur les métadonnées peuvent être classées en deux catégories : les demandes d'écriture et les demandes de lecture. Toutefois, ces deux types de demandes sont traités de la même manière lors du traitement des métadonnées.
Les demandes de lecture sur les métadonnées comprennent
- le schéma de collecte des requêtes
- Informations sur l'indexation des requêtes Et plus encore
Les demandes d'écriture comprennent
- Créer une collection
- Déposer une collection
- Créer un index
- Supprimer un index Et plus encore
Les requêtes DDL sont envoyées au proxy par le client, et le proxy transmet ensuite ces requêtes dans l'ordre reçu au coordinateur racine qui attribue un horodatage à chaque requête DDL et effectue des contrôles dynamiques sur les requêtes. Le proxy traite chaque demande en série, c'est-à-dire une demande DDL à la fois. Le proxy ne traitera pas la demande suivante avant d'avoir terminé le traitement de la demande précédente et d'avoir reçu les résultats de la coordonnée racine.
Opérations DDL.
Comme le montre l'illustration ci-dessus, il y a K
demandes DDL dans la file d'attente des tâches de la coordonnée racine. Les demandes DDL dans la file d'attente des tâches sont classées dans l'ordre dans lequel elles sont reçues par la coordonnée racine. Ainsi, ddl1
est la première envoyée à la coordination racine et ddlK
est la dernière de ce lot. Le coordinateur racine traite les demandes une par une dans l'ordre chronologique.
Dans un système distribué, la communication entre les mandataires et le coordinateur racine est assurée par gRPC. Le coordinateur racine conserve un enregistrement de la valeur maximale de l'horodatage des tâches exécutées afin de s'assurer que toutes les demandes DDL sont traitées dans l'ordre chronologique.
Supposons qu'il y ait deux mandataires indépendants, le mandataire 1 et le mandataire 2. Ils envoient tous deux des demandes DDL à la même coordonnée racine. Cependant, un problème se pose : les demandes antérieures ne sont pas nécessairement envoyées à la coordonnée racine avant les demandes reçues ultérieurement par un autre proxy. Par exemple, dans l'image ci-dessus, lorsque DDL_K-1
est envoyé au coordonnateur racine par le proxy 1, DDL_K
du proxy 2 a déjà été accepté et exécuté par le coordonnateur racine. Comme l'a enregistré le coordonnateur racine, la valeur maximale de l'horodatage des tâches exécutées à ce stade est K
. Ainsi, pour ne pas interrompre l'ordre temporel, la demande DDL_K-1
sera rejetée par la file d'attente des tâches du coordonnateur racine. Toutefois, si le proxy 2 envoie la demande DDL_K+5
au coordonnateur racine à ce stade, la demande sera acceptée dans la file d'attente des tâches et sera exécutée ultérieurement en fonction de sa valeur d'horodatage.
Indexation
Création d'un index
Lorsqu'il reçoit des demandes de création d'index de la part du client, le proxy effectue d'abord des vérifications statiques sur les demandes et les envoie à la coordonnée racine. Ensuite, le coordonnateur racine persiste ces demandes de construction d'index dans le méta stockage (etcd) et envoie les demandes au coordinateur d'index (coordonnateur d'index).
Construction d'un index.
Comme illustré ci-dessus, lorsque le coordinateur d'index reçoit des demandes de construction d'index de la part du coordinateur racine, il persiste d'abord la tâche dans etcd pour le méta-magasin. L'état initial de la tâche de construction d'index est Unissued
. La coordination de l'index maintient un registre de la charge de travail de chaque nœud d'index et envoie les tâches entrantes à un nœud d'index moins chargé. Une fois la tâche terminée, le nœud d'index écrit l'état de la tâche, soit Finished
ou Failed
, dans le méta-magasin, qui est etcd dans Milvus. Le coordonnateur de l'index comprendra alors si la tâche de construction de l'index réussit ou échoue en consultant etcd. Si la tâche échoue en raison de ressources système limitées ou de l'abandon du nœud d'index, le coordinateur d'index redéclenchera l'ensemble du processus et attribuera la même tâche à un autre nœud d'index.
Abandon d'un index
En outre, le coordinateur d'index est également chargé des demandes d'abandon d'index.
Abandon d'un index.
Lorsque le coordonnateur de la racine reçoit une demande de suppression d'index de la part du client, il marque d'abord l'index comme "supprimé" et renvoie le résultat au client tout en notifiant le coordonnateur de l'index. Ensuite, la coordination d'indexation filtre toutes les tâches d'indexation à l'aide de IndexID
et les tâches correspondant à la condition sont supprimées.
La coroutine d'arrière-plan de la coordination d'indexation supprime progressivement toutes les tâches d'indexation marquées comme "abandonnées" du stockage d'objets (MinIO et S3). Ce processus implique l'interface recycleIndexFiles. Lorsque tous les fichiers d'index correspondants sont supprimés, les méta-informations des tâches d'indexation supprimées sont supprimées du méta-stockage (etcd).
À propos de la série Deep Dive
Avec l'annonce officielle de la disponibilité générale de Milvus 2.0, nous avons orchestré cette série de blogs Milvus Deep Dive afin de fournir une interprétation approfondie de l'architecture et du code source de Milvus. Les sujets abordés dans cette série de blogs sont les suivants
- Interface MsgStream
- Écriture de données
- Lecture des données
- Opérations DDL
- Indexation
- À propos de la série Deep Dive
On This Page
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word