Milvus Operatorでメッセージストレージを設定する
Milvusでは、RocksMQ、Pulsar、またはKafkaを使用して、最近の変更に関するログの管理、ストリームログの出力、およびログのサブスクリプションを提供します。このトピックでは、Milvus OperatorでMilvusをインストールする際に、メッセージストレージの依存関係を設定する方法を紹介します。詳細については、Milvus Operatorリポジトリの「Milvus Operatorでメッセージストレージを設定する」を参照してください。
このトピックでは、Milvus Operatorをデプロイ済みであることを前提としています。
Milvus Operatorを使用してMilvusクラスタを起動するには、設定ファイルを指定する必要があります。
kubectl apply -f https://raw.githubusercontent.com/zilliztech/milvus-operator/main/config/samples/milvus_cluster_default.yaml
サードパーティの依存関係を設定するには、milvus_cluster_default.yaml
のコードテンプレートを編集するだけです。以下のセクションでは、オブジェクト・ストレージ、etcd、Pulsarの設定方法をそれぞれ紹介します。
始める前に
RocksMQ、NATS、Pulsar、KafkaがMilvusのスタンドアロンおよびクラスタモードでサポートされているかどうかを以下の表に示します。
RocksMQ | NATS | Pulsar | Kafka | |
---|---|---|---|---|
スタンドアロンモード | ✔️ | ✔️ | ✔️ | ✔️ |
クラスターモード | ✖️ | ✖️ | ✔️ | ✔️ |
メッセージストレージの指定には他にも制限があります:
- 1つのMilvusインスタンスに対して1つのメッセージストレージのみがサポートされます。ただし、1つのインスタンスに複数のメッセージストレージを設定しても、後方互換性があります。優先順位は以下の通りです:
- スタンドアロンモード RocksMQ (デフォルト) > Pulsar > Kafka
- クラスタ・モード:Pulsar(デフォルト)> Kafka
- 後方互換性のため、2.3から導入されたNatsはこれらの優先ルールに参加しません。
- Milvusシステムの実行中にメッセージストレージを変更することはできません。
- Kafka 2.x または 3.x バージョンのみがサポートされています。
RocksMQの設定
RocksMQはMilvusスタンドアロンのデフォルトのメッセージストレージです。
現在、RocksMQをMilvusスタンドアロンのメッセージストレージとして設定できるのはMilvus Operatorのみです。
設定例
以下の例ではRocksMQサービスを設定しています。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: milvus
spec:
dependencies: {}
components: {}
config: {}
NATSの設定
NATSの代替メッセージストレージです。
例
以下の例ではNATSサービスを設定します。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: milvus
spec:
dependencies:
msgStreamType: 'natsmq'
natsmq:
# server side configuration for natsmq.
server:
# 4222 by default, Port for nats server listening.
port: 4222
# /var/lib/milvus/nats by default, directory to use for JetStream storage of nats.
storeDir: /var/lib/milvus/nats
# (B) 16GB by default, Maximum size of the 'file' storage.
maxFileStore: 17179869184
# (B) 8MB by default, Maximum number of bytes in a message payload.
maxPayload: 8388608
# (B) 64MB by default, Maximum number of bytes buffered for a connection applies to client connections.
maxPending: 67108864
# (√ms) 4s by default, waiting for initialization of natsmq finished.
initializeTimeout: 4000
monitor:
# false by default, If true enable debug log messages.
debug: false
# true by default, If set to false, log without timestamps.
logTime: true
# no log file by default, Log file path relative to.. .
logFile:
# (B) 0, unlimited by default, Size in bytes after the log file rolls over to a new one.
logSizeLimit: 0
retention:
# (min) 3 days by default, Maximum age of any message in the P-channel.
maxAge: 4320
# (B) None by default, How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size.
maxBytes:
# None by default, How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit.
maxMsgs:
components: {}
config: {}
メッセージストレージをRocksMQからNATSに移行するには、以下の手順を実行します:
すべてのDDLオペレーションを停止する。
FlushAll APIを呼び出し、APIコールの実行が終了したらMilvusを停止する。
msgStreamType
をnatsmq
に変更し、spec.dependencies.natsmq
の NATS 設定に必要な変更を加える。Milvusを再度起動し、以下を確認してください:
- ログに
mqType=natsmq
というログエントリが存在する。 spec.dependencies.natsmq.server.storeDir
で指定したディレクトリにjetstream
という名前のディレクトリが存在する。
- ログに
(オプション) RocksMQストレージディレクトリのデータファイルをバックアップし、クリーンアップする。
RocksMQとNATSのどちらを選びますか?
RockMQはRocksDBとのやりとりにCGOを使用し、自身でメモリ管理を行いますが、Milvusに組み込まれている純粋なGo NATSはメモリ管理をGoのガベージコレクタ(GC)に委譲します。
データパケットが64kbより小さい場合、RocksDBはメモリ使用量、CPU使用量、レスポンスタイムの点で優れている。一方、データ・パケットが64kbより大きい場合は、十分なメモリと理想的なGCスケジューリングがあれば、NATSの方が応答時間の点で優れている。
現在のところ、NATSは実験にのみ使用することをお勧めします。
Pulsarの設定
Pulsarは最近の変更のログを管理し、ストリーム・ログを出力し、ログ購読を提供します。メッセージ・ストレージ用にPulsarを設定することは、MilvusスタンドアロンおよびMilvusクラスタの両方でサポートされています。ただしMilvus Operatorでは、Milvusクラスタのメッセージ・ストレージとしてのみPulsarを構成することができます。Pulsarを設定するには、spec.dependencies.pulsar
の下に必須フィールドを追加してください。
pulsar
external
および をサポートしています。inCluster
外部Pulsar
external
は外部Pulsarサービスの使用を示します。 外部Pulsarサービスの構成に使用されるフィールドには以下が含まれます:
external
:true
値は、Milvusが外部Pulsarサービスを使用していることを示します。endpoints
:Pulsarのエンドポイント。
例
以下の例では、外部Pulsarサービスを設定しています。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: my-release
labels:
app: milvus
spec:
dependencies: # Optional
pulsar: # Optional
# Whether (=true) to use an existed external pulsar as specified in the field endpoints or
# (=false) create a new pulsar inside the same kubernetes cluster for milvus.
external: true # Optional default=false
# The external pulsar endpoints if external=true
endpoints:
- 192.168.1.1:6650
components: {}
config: {}
内部Pulsar
inCluster
は、Milvusクラスタが起動すると、クラスタ内で自動的にPulsarサービスが起動することを示します。
例
以下の例では、内部Pulsarサービスを構成します。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: my-release
labels:
app: milvus
spec:
dependencies:
pulsar:
inCluster:
values:
components:
autorecovery: false
zookeeper:
replicaCount: 1
bookkeeper:
replicaCount: 1
resoureces:
limit:
cpu: '4'
memory: 8Gi
requests:
cpu: 200m
memory: 512Mi
broker:
replicaCount: 1
configData:
## Enable `autoSkipNonRecoverableData` since bookkeeper is running
## without persistence
autoSkipNonRecoverableData: "true"
managedLedgerDefaultEnsembleSize: "1"
managedLedgerDefaultWriteQuorum: "1"
managedLedgerDefaultAckQuorum: "1"
proxy:
replicaCount: 1
components: {}
config: {}
pulsar.inCluster.values
、必要に応じて構成項目を追加してください。構成ファイルの名前をmilvuscluster.yaml
と仮定し、以下のコマンドを実行して構成を適用します。
kubectl apply -f milvuscluster.yaml
Kafkaの構成
PulsarはMilvusクラスタにおけるデフォルトのメッセージ・ストレージです。Kafkaを使用する場合は、オプションのフィールドmsgStreamType
。
kafka
は および をサポートしています。external
inCluster
外部Kafka
external
は外部Kafkaサービスを使用することを示します。
外部Kafkaサービスの設定に使用されるフィールドは以下のとおりです:
external
:true
の値は、Milvusが外部Kafkaサービスを使用していることを示します。brokerList
:メッセージを送信するブローカーのリスト。
例
以下の例では、外部Kafkaサービスを設定しています。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: my-release
labels:
app: milvus
spec:
config:
kafka:
# securityProtocol supports: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
securityProtocol: PLAINTEXT
# saslMechanisms supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
saslMechanisms: PLAIN
saslUsername: ""
saslPassword: ""
# Omit other fields ...
dependencies:
# Omit other fields ...
msgStreamType: "kafka"
kafka:
external: true
brokerList:
- "kafkaBrokerAddr1:9092"
- "kafkaBrokerAddr2:9092"
# ...
SASLの設定はoperator v0.8.5以上のバージョンでサポートされています。
内部Kafka
inCluster
は、Milvusクラスタが起動すると、クラスタ内でKafkaサービスが自動的に起動することを示します。
例
次の例では、内部Kafkaサービスを設定します。
apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
name: my-release
labels:
app: milvus
spec:
dependencies:
msgStreamType: "kafka"
kafka:
inCluster:
values: {} # values can be found in https://artifacthub.io/packages/helm/bitnami/kafka
components: {}
config: {}
内部Kafkaサービスを設定するための完全な設定項目はこちらを参照してください。kafka.inCluster.values
の下に、必要に応じて設定項目を追加します。
設定ファイルの名前がmilvuscluster.yaml
の場合、次のコマンドを実行して設定を適用します。
kubectl apply -f milvuscluster.yaml
次のステップ
Milvus Operatorを使用して他のMilvus依存関係を設定する方法について説明します: