🚀 Testen Sie Zilliz Cloud, die vollständig verwaltete Milvus, kostenlos – erleben Sie 10x schnellere Leistung! Jetzt testen>>

milvus-logo
LFAI
  • Home
  • Blog
  • Wie werden die Daten in einer Vektordatenbank verarbeitet?

Wie werden die Daten in einer Vektordatenbank verarbeitet?

  • Engineering
March 28, 2022
Zhenshan Cao

Cover image Titelbild

Dieser Artikel wurde von Zhenshan Cao geschrieben und von Angela Ni umgesetzt.

In den beiden vorangegangenen Beiträgen dieser Blogserie haben wir bereits die Systemarchitektur von Milvus, der weltweit fortschrittlichsten Vektordatenbank, sowie ihr Python-SDK und ihre API behandelt.

Dieser Beitrag zielt hauptsächlich darauf ab, Ihnen dabei zu helfen, zu verstehen, wie Daten in Milvus verarbeitet werden, indem wir tief in das Milvus-System eindringen und die Interaktion zwischen den Datenverarbeitungskomponenten untersuchen.

Im Folgenden finden Sie einige nützliche Ressourcen, bevor Sie beginnen. Wir empfehlen, diese zuerst zu lesen, um das Thema in diesem Beitrag besser zu verstehen.

MsgStream-Schnittstelle

DieMsgStream-Schnittstelle ist entscheidend für die Datenverarbeitung in Milvus. Wenn Start() aufgerufen wird, schreibt die Coroutine im Hintergrund Daten in den Log-Broker oder liest Daten von dort. Wenn Close() aufgerufen wird, stoppt die Coroutine.

MsgStream interface MsgStream-Schnittstelle

Der MsgStream kann sowohl als Produzent als auch als Konsument dienen. Die Schnittstelle AsProducer(channels []string) definiert MsgStream als Produzent, während die Schnittstelle AsConsumer(channels []string, subNamestring)ihn als Konsument definiert. Der Parameter channels wird von beiden Schnittstellen gemeinsam genutzt, um festzulegen, in welche (physischen) Kanäle Daten geschrieben bzw. aus denen Daten gelesen werden sollen.

Die Anzahl der Shards in einer Sammlung kann bei der Erstellung einer Sammlung angegeben werden. Jeder Shard entspricht einem virtuellen Kanal (vchannel). Daher kann eine Sammlung mehrere V-Kanäle haben. Milvus weist jedem vchannel im Protokollbroker einen physischen Kanal (pchannel) zu.

Each virtual channel/shard corresponds to a physical channel. Jeder virtuelle Kanal/Shaard entspricht einem physischen Kanal.

Produce() in der MsgStream-Schnittstelle, die für das Schreiben von Daten in die pchannels im Log-Broker zuständig ist. Die Daten können auf zwei Arten geschrieben werden:

  • Einfaches Schreiben: Entitäten werden anhand der Hash-Werte der Primärschlüssel in verschiedene Shards (vchannel) geschrieben. Dann fließen diese Entitäten in die entsprechenden pchannels im Log-Broker.
  • Broadcast write: Entitäten werden in alle pchannels geschrieben, die durch den Parameter channels angegeben sind.

Consume() ist eine Art von blockierender API. Wenn in dem angegebenen pchannel keine Daten verfügbar sind, wird die Coroutine blockiert, wenn Consume() in der MsgStream-Schnittstelle aufgerufen wird. Andererseits ist Chan() eine nicht blockierende API, was bedeutet, dass die Coroutine nur dann Daten liest und verarbeitet, wenn im angegebenen pchannel Daten vorhanden sind. Andernfalls kann die Coroutine andere Aufgaben bearbeiten und wird nicht blockiert, wenn keine Daten verfügbar sind.

Seek() ist eine Methode zur Fehlerbehebung. Wenn ein neuer Knoten gestartet wird, kann der Datenverbrauchsdatensatz abgerufen werden und der Datenverbrauch kann durch den Aufruf von Seek() an der Stelle fortgesetzt werden, an der er unterbrochen wurde.

Daten schreiben

Die Daten, die in verschiedene V-Kanäle (Shards) geschrieben werden, können entweder eine Einfüge- oder eine Löschnachricht sein. Diese vchannels können auch als DmChannels (data manipulation channels) bezeichnet werden.

Verschiedene Sammlungen können sich dieselben pchannels im Log-Broker teilen. Eine Sammlung kann mehrere Shards und damit mehrere entsprechende V-Kanäle haben. Die Entitäten in derselben Sammlung fließen folglich in mehrere entsprechende pchannels im Logbroker. Der Vorteil der gemeinsamen Nutzung von P-Kanälen ist daher ein erhöhter Durchsatz, der durch die hohe Gleichzeitigkeit des Log-Brokers ermöglicht wird.

Bei der Erstellung einer Sammlung wird nicht nur die Anzahl der Shards angegeben, sondern auch die Zuordnung zwischen vchannels und pchannels im Log-Broker festgelegt.

Write path in Milvus Schreibpfad in Milvus

Wie in der obigen Abbildung gezeigt, schreiben Proxies im Schreibpfad Daten in den Log-Broker über die Schnittstelle AsProducer() des MsgStream. Dann konsumieren Datenknoten die Daten, konvertieren und speichern die konsumierten Daten im Objektspeicher. Der Speicherpfad ist eine Art von Metainformation, die von Datenkoordinatoren in etcd aufgezeichnet wird.

Flussdiagramm

Da sich verschiedene Sammlungen dieselben pchannels im Log-Broker teilen können, müssen Datenknoten oder Abfrageknoten beim Konsumieren von Daten beurteilen, zu welcher Sammlung die Daten in einem pchannel gehören. Um dieses Problem zu lösen, haben wir den Flowgraph in Milvus eingeführt. Er ist hauptsächlich für die Filterung von Daten in einem gemeinsamen pchannel nach Sammlungs-IDs zuständig. Wir können also sagen, dass jeder Flowgraph den Datenstrom in einem entsprechenden Shard (vchannel) in einer Sammlung verarbeitet.

Flowgraph in write path Flowgraph im Schreibpfad

MsgStream-Erstellung

Beim Schreiben von Daten wird das MsgStream-Objekt in den folgenden beiden Szenarien erstellt:

  • Wenn der Proxy eine Dateneinfügeanforderung erhält, versucht er zunächst, die Zuordnung zwischen vchannels und pchannels über den Stammkoordinator (root coord) zu erhalten. Dann erstellt der Proxy ein MsgStream-Objekt.

Scenario 1 Szenario 1

  • Wenn der Datenknoten startet und die Metainformationen der Kanäle in etcd liest, wird das MsgStream-Objekt erstellt.

Scenario 2 Szenario 2

Daten lesen

Read path in Milvus Lesepfad in Milvus

Der allgemeine Arbeitsablauf beim Lesen von Daten ist in der obigen Abbildung dargestellt. Abfrageanfragen werden über DqRequestChannel an Abfrageknoten gesendet. Die Abfrageknoten führen die Abfrageaufgaben parallel aus. Die Abfrageergebnisse von den Abfrageknoten gehen durch gRPC und der Proxy aggregiert die Ergebnisse und gibt sie an den Client zurück.

Um einen genaueren Blick auf den Datenleseprozess zu werfen, können wir sehen, dass der Proxy Abfrageanforderungen in den DqRequestChannel schreibt. Die Abfrageknoten konsumieren dann die Nachricht, indem sie den DqRequestChannel abonnieren. Jede Nachricht im DqRequestChannel wird verbreitet, so dass alle abonnierten Abfrageknoten die Nachricht empfangen können.

Wenn Abfrageknoten Abfrageanfragen erhalten, führen sie eine lokale Abfrage sowohl von Batch-Daten, die in versiegelten Segmenten gespeichert sind, als auch von Streaming-Daten durch, die dynamisch in Milvus eingefügt und in wachsenden Segmenten gespeichert werden. Anschließend müssen die Abfrageknoten die Abfrageergebnisse sowohl in versiegelten als auch in wachsenden Segmenten aggregieren. Diese aggregierten Ergebnisse werden über gRPC an den Proxy weitergeleitet.

Der Proxy sammelt alle Ergebnisse von mehreren Abfrageknoten und aggregiert sie dann, um die endgültigen Ergebnisse zu erhalten. Anschließend gibt der Proxy die endgültigen Abfrageergebnisse an den Client zurück. Da jede Abfrage und die entsprechenden Abfrageergebnisse durch dieselbe eindeutige requestID gekennzeichnet sind, kann der Proxy herausfinden, welche Abfrageergebnisse welcher Abfrage entsprechen.

Flussdiagramm

Flowgraph in read path Flussdiagramm im Lesepfad

Ähnlich wie im Schreibpfad werden auch im Lesepfad Flussgraphen eingeführt. Milvus implementiert die einheitliche Lambda-Architektur, die die Verarbeitung von inkrementellen und historischen Daten integriert. Daher müssen die Abfrageknoten auch Echtzeit-Streaming-Daten erhalten. In ähnlicher Weise filtern und differenzieren Flowgraphs im Lesepfad Daten aus verschiedenen Sammlungen.

MsgStream-Erstellung

Creating MsgStream object in read path Erstellen des MsgStream-Objekts im Lesepfad

Beim Lesen von Daten wird das MsgStream-Objekt im folgenden Szenario erstellt:

  • In Milvus können die Daten erst gelesen werden, wenn sie geladen sind. Wenn der Proxy eine Anforderung zum Laden von Daten erhält, sendet er die Anforderung an den Abfragekoordinator, der über die Art der Zuweisung von Shards zu verschiedenen Abfrageknoten entscheidet. Die Zuweisungsinformationen (d. h. die Namen der V-Kanäle und die Zuordnung zwischen V-Kanälen und den entsprechenden P-Kanälen) werden per Methodenaufruf oder RPC (Remote Procedure Call) an die Abfrageknoten gesendet. Anschließend erstellen die Abfrageknoten die entsprechenden MsgStream-Objekte, um die Daten abzurufen.

DDL-Operationen

DDL steht für Data Definition Language. DDL-Operationen auf Metadaten können in Schreib- und Leseanfragen unterteilt werden. Diese beiden Arten von Anfragen werden jedoch bei der Verarbeitung von Metadaten gleich behandelt.

Zu den Leseanforderungen für Metadaten gehören:

  • Abfrage des Sammlungsschemas
  • Abfrage von Indizierungsinformationen und mehr

Schreibanfragen umfassen:

  • Erstellen einer Sammlung
  • Löschen einer Sammlung
  • Erstellen eines Index
  • Löschen eines Indexes Und mehr

DDL-Anforderungen werden vom Client an den Proxy gesendet. Der Proxy leitet diese Anforderungen in der empfangenen Reihenfolge an den Root-Koordinator weiter, der jeder DDL-Anforderung einen Zeitstempel zuweist und dynamische Prüfungen der Anforderungen durchführt. Der Proxy bearbeitet jede Anfrage seriell, d.h. eine DDL-Anfrage nach der anderen. Der Proxy bearbeitet die nächste Anforderung erst, wenn er die Verarbeitung der vorherigen Anforderung abgeschlossen und die Ergebnisse vom Root-Koordinator erhalten hat.

DDL operations. DDL-Operationen.

Wie in der obigen Abbildung zu sehen ist, befinden sich K DDL-Anforderungen in der Aufgabenwarteschlange des Root-Koordinators. Die DDL-Anforderungen in der Aufgabenwarteschlange sind in der Reihenfolge angeordnet, in der sie von der Root-Koordinate empfangen werden. So ist ddl1 die erste, die an Root Coord gesendet wurde, und ddlK ist die letzte in diesem Stapel. Der Root-Koordinator bearbeitet die Anfragen eine nach der anderen in der zeitlichen Reihenfolge.

In einem verteilten System wird die Kommunikation zwischen den Proxys und dem Root-Koordinator durch gRPC ermöglicht. Der Root-Koordinator hält den maximalen Zeitstempelwert der ausgeführten Aufgaben fest, um sicherzustellen, dass alle DDL-Anforderungen in zeitlicher Reihenfolge verarbeitet werden.

Angenommen, es gibt zwei unabhängige Proxys, Proxy 1 und Proxy 2. Beide senden DDL-Anforderungen an dieselbe Stammkoordinate. Ein Problem besteht jedoch darin, dass frühere Anfragen nicht unbedingt vor den Anfragen, die ein anderer Proxy später erhält, an die Root-Koordinate gesendet werden. Wenn beispielsweise in der obigen Abbildung DDL_K-1 von Proxy 1 an den Root-Koordinator gesendet wird, wurde DDL_K von Proxy 2 bereits akzeptiert und vom Root-Koordinator ausgeführt. Wie vom Root-Koordinator aufgezeichnet, beträgt der maximale Zeitstempelwert der ausgeführten Aufgaben zu diesem Zeitpunkt K. Um die zeitliche Abfolge nicht zu unterbrechen, wird die Anfrage DDL_K-1 von der Aufgabenwarteschlange des Stammkoordinators zurückgewiesen. Sendet Proxy 2 jedoch zu diesem Zeitpunkt die Anfrage DDL_K+5 an den Root-Koordinator, wird die Anfrage in die Aufgabenwarteschlange aufgenommen und später entsprechend ihrem Zeitstempelwert ausgeführt.

Indizierung

Aufbau eines Indexes

Wenn der Proxy Anfragen zur Indexerstellung vom Client erhält, führt er zunächst statische Prüfungen an den Anfragen durch und sendet sie an die Stammkoordinate. Der Root-Koordinator persistiert diese Indexaufbauanfragen im Metaspeicher (etcd) und sendet die Anfragen an den Indexkoordinator (Indexkoordinator).

Building an index. Aufbau eines Index.

Wenn der Indexkoordinator, wie oben dargestellt, Indexerstellungsanforderungen vom Stammkoordinator erhält, persistiert er die Aufgabe zunächst in etcd für den Metaspeicher. Der anfängliche Status des Indexaufbautasks ist Unissued. Der Indexkoordinator hält die Auslastung der einzelnen Indexknoten fest und leitet eingehende Tasks an einen weniger belasteten Indexknoten weiter. Nach Abschluss des Tasks schreibt der Indexknoten den Status des Tasks, entweder Finished oder Failed in den Metaspeicher, der in Milvus etcd ist. Dann kann der Index-Koordinator durch Nachschlagen in etcd feststellen, ob der Indexaufbau-Task erfolgreich war oder nicht. Wenn die Aufgabe aufgrund begrenzter Systemressourcen oder eines Ausfalls des Indexknotens fehlschlägt, löst der Indexkoordinator den gesamten Prozess erneut aus und weist die gleiche Aufgabe einem anderen Indexknoten zu.

Ablegen eines Index

Darüber hinaus ist der Indexkoordinator auch für die Aufgabe von Indizes zuständig.

Dropping an index. Fallenlassen eines Indexes.

Wenn der Root-Koordinator eine Anfrage zum Löschen eines Index vom Client erhält, markiert er den Index zunächst als "gelöscht" und gibt das Ergebnis an den Client zurück, während er den Index-Koordinator benachrichtigt. Dann filtert der Indexkoordinator alle Indizierungsaufgaben mit der IndexID und die Aufgaben, die die Bedingung erfüllen, werden fallen gelassen.

Die Hintergrund-Coroutine des Indexkoordinators löscht nach und nach alle als "fallen gelassen" markierten Indizierungsaufgaben aus dem Objektspeicher (MinIO und S3). Dieser Prozess erfolgt über die Schnittstelle recycleIndexFiles. Wenn alle entsprechenden Indexdateien gelöscht sind, werden die Metainformationen der gelöschten Indexierungsaufgaben aus dem Metaspeicher (etcd) entfernt.

Über die Deep Dive-Serie

Mit der offiziellen Ankündigung der allgemeinen Verfügbarkeit von Milvus 2.0 haben wir diese Milvus-Deep-Dive-Blogserie ins Leben gerufen, um eine tiefgehende Interpretation der Milvus-Architektur und des Quellcodes zu bieten. Die Themen dieser Blogserie umfassen:

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started

Like the article? Spread the word

Weiterlesen