milvus-logo
LFAI
フロントページへ
  • 管理ガイド

Milvus Operatorでメッセージストレージを設定する

Milvusでは、RocksMQ、Pulsar、またはKafkaを使用して、最近の変更に関するログの管理、ストリームログの出力、およびログのサブスクリプションを提供します。このトピックでは、Milvus OperatorでMilvusをインストールする際に、メッセージストレージの依存関係を設定する方法を紹介します。詳細については、Milvus Operatorリポジトリの「Milvus Operatorでメッセージストレージを設定する」を参照してください。

このトピックでは、Milvus Operatorをデプロイ済みであることを前提としています。

詳細については、「Milvus Operatorのデプロイ」を参照してください。

Milvus Operatorを使用してMilvusクラスタを起動するには、設定ファイルを指定する必要があります。

kubectl apply -f https://raw.githubusercontent.com/zilliztech/milvus-operator/main/config/samples/milvus_cluster_default.yaml

サードパーティの依存関係を設定するには、milvus_cluster_default.yaml のコードテンプレートを編集するだけです。以下のセクションでは、オブジェクト・ストレージ、etcd、Pulsarの設定方法をそれぞれ紹介します。

始める前に

以下の表は、RocksMQ、NATS、Pulsar、KafkaがMilvusスタンドアロンおよびクラスタモードでサポートされているかどうかを示しています。

RocksMQNATSPulsarKafka
スタンドアロンモード✔️✔️✔️✔️
クラスターモード✖️✖️✔️✔️

メッセージストレージの指定には他にも制限があります:

  • 1つのMilvusインスタンスに対して1つのメッセージストレージのみがサポートされます。ただし、1つのインスタンスに複数のメッセージストレージを設定しても、後方互換性があります。優先順位は以下の通りです:
    • スタンドアロンモード RocksMQ (デフォルト) > Pulsar > Kafka
    • クラスタ・モード:Pulsar (デフォルト) > Kafka
    • 後方互換性のため、2.3から導入されたNatsはこれらの優先ルールに参加しません。
  • Milvusシステムの実行中は、メッセージストレージを変更することはできません。
  • Kafka 2.x または 3.x バージョンのみがサポートされています。

RocksMQの設定

RocksMQはMilvusスタンドアロンのデフォルトのメッセージストレージです。

現在、RocksMQをMilvusスタンドアロンのメッセージストレージとして設定できるのはMilvus Operatorのみです。

設定例

以下の例では、RocksMQサービスを設定しています。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: milvus
spec:
  dependencies: {}
  components: {}
  config: {}

NATSの設定

NATSの代替メッセージストレージです。

以下の例ではNATSサービスを設定します。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: milvus
spec:
  dependencies: 
    msgStreamType: 'natsmq'
    natsmq:
      # server side configuration for natsmq.
      server: 
        # 4222 by default, Port for nats server listening.
        port: 4222 
        # /var/lib/milvus/nats by default, directory to use for JetStream storage of nats.
        storeDir: /var/lib/milvus/nats 
        # (B) 16GB by default, Maximum size of the 'file' storage.
        maxFileStore: 17179869184 
        # (B) 8MB by default, Maximum number of bytes in a message payload.
        maxPayload: 8388608 
        # (B) 64MB by default, Maximum number of bytes buffered for a connection applies to client connections.
        maxPending: 67108864 
        # (√ms) 4s by default, waiting for initialization of natsmq finished.
        initializeTimeout: 4000 
        monitor:
          # false by default, If true enable debug log messages.
          debug: false 
          # true by default, If set to false, log without timestamps.
          logTime: true 
          # no log file by default, Log file path relative to.. .
          logFile: 
          # (B) 0, unlimited by default, Size in bytes after the log file rolls over to a new one.
          logSizeLimit: 0 
        retention:
          # (min) 3 days by default, Maximum age of any message in the P-channel.
          maxAge: 4320 
          # (B) None by default, How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size.
          maxBytes:
          # None by default, How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit.    
          maxMsgs: 
  components: {}
  config: {}

メッセージストレージをRocksMQからNATSに移行するには、以下の手順を実行します:

  1. すべてのDDLオペレーションを停止する。

  2. FlushAll APIを呼び出し、APIコールの実行が終了したらMilvusを停止する。

  3. msgStreamTypenatsmq に変更し、spec.dependencies.natsmq の NATS 設定に必要な変更を加える。

  4. Milvusを再度起動し、以下を確認してください:

    • ログにmqType=natsmq というログエントリが存在する。
    • spec.dependencies.natsmq.server.storeDir で指定したディレクトリにjetstream という名前のディレクトリが存在する。
  5. (オプション) RocksMQストレージディレクトリのデータファイルをバックアップし、クリーンアップする。

RocksMQとNATSのどちらを選びますか?

RockMQはRocksDBとのやりとりにCGOを使用し、自身でメモリを管理しますが、Milvusに組み込まれている純粋なGO NATSはメモリ管理をGoのガベージコレクタ(GC)に委譲します。

データパケットが64kbより小さい場合、RocksDBはメモリ使用量、CPU使用量、レスポンスタイムの点で優れている。一方、データ・パケットが64kbより大きい場合は、十分なメモリと理想的なGCスケジューリングがあれば、NATSの方が応答時間の点で優れている。

現在のところ、NATSは実験にのみ使用することをお勧めします。

Pulsarの設定

Pulsarは最近の変更のログを管理し、ストリーム・ログを出力し、ログ購読を提供します。メッセージ・ストレージ用にPulsarを設定することは、MilvusスタンドアロンおよびMilvusクラスタの両方でサポートされています。ただしMilvus Operatorでは、PulsarをMilvusクラスタのメッセージ・ストレージとしてのみ構成することができます。Pulsarを設定するには、spec.dependencies.pulsar の下に必須フィールドを追加してください。

pulsar external および をサポートしています。inCluster

外部Pulsar

external は外部Pulsarサービスの使用を示します。 外部Pulsarサービスの構成に使用されるフィールドには以下が含まれます:

  • external: true 値は、Milvusが外部Pulsarサービスを使用していることを示します。
  • endpoints:Pulsarのエンドポイント。

以下の例では、外部Pulsarサービスを設定しています。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: my-release
  labels:
    app: milvus
spec:
  dependencies: # Optional
    pulsar: # Optional
      # Whether (=true) to use an existed external pulsar as specified in the field endpoints or 
      # (=false) create a new pulsar inside the same kubernetes cluster for milvus.
      external: true # Optional default=false
      # The external pulsar endpoints if external=true
      endpoints:
      - 192.168.1.1:6650
  components: {}
  config: {}           

内部Pulsar

inCluster は、Milvusクラスタが起動すると、クラスタ内で自動的にPulsarサービスが起動することを示します。

以下の例では、内部Pulsarサービスを構成します。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: my-release
  labels:
    app: milvus
spec:
  dependencies:
    pulsar:
      inCluster:
        values:
          components:
            autorecovery: false
          zookeeper:
            replicaCount: 1
          bookkeeper:
            replicaCount: 1
            resoureces:
              limit:
                cpu: '4'
              memory: 8Gi
            requests:
              cpu: 200m
              memory: 512Mi
          broker:
            replicaCount: 1
            configData:
              ## Enable `autoSkipNonRecoverableData` since bookkeeper is running
              ## without persistence
              autoSkipNonRecoverableData: "true"
              managedLedgerDefaultEnsembleSize: "1"
              managedLedgerDefaultWriteQuorum: "1"
              managedLedgerDefaultAckQuorum: "1"
          proxy:
            replicaCount: 1
  components: {}
  config: {}            
この例では、Pulsarの各コンポーネントのレプリカ数、Pulsar BookKeeperのコンピュート・リソース、その他の構成を指定します。
values.yamlで、内部Pulsarサービスを構成する完全な構成項目を見つけてください。先の例に示すように、pulsar.inCluster.values 、必要に応じて構成項目を追加してください。

構成ファイルの名前をmilvuscluster.yaml と仮定し、以下のコマンドを実行して構成を適用します。

kubectl apply -f milvuscluster.yaml

Kafkaの構成

PulsarはMilvusクラスタにおけるデフォルトのメッセージ・ストレージです。Kafkaを使用する場合は、オプションのフィールドmsgStreamType

kafka は および をサポートしています。external inCluster

外部Kafka

external は外部Kafkaサービスを使用することを示します。

外部Kafkaサービスの設定に使用されるフィールドは以下のとおりです:

  • external:true の値は、Milvusが外部Kafkaサービスを使用していることを示します。
  • brokerList:メッセージを送信するブローカーのリスト。

以下の例では、外部Kafkaサービスを設定しています。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: my-release
  labels:
    app: milvus
spec:
  config:
    kafka:
      # securityProtocol supports: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL 
      securityProtocol: PLAINTEXT
      # saslMechanisms supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
      saslMechanisms: PLAIN
      saslUsername: ""
      saslPassword: ""
  # Omit other fields ...
  dependencies:
    # Omit other fields ...
    msgStreamType: "kafka"
    kafka:
      external: true
      brokerList: 
        - "kafkaBrokerAddr1:9092"
        - "kafkaBrokerAddr2:9092"
        # ...

SASLの設定はoperator v0.8.5以上のバージョンでサポートされています。

内部Kafka

inCluster は、Milvusクラスタが起動すると、クラスタ内でKafkaサービスが自動的に起動することを示します。

次の例では、内部Kafkaサービスを設定します。

apiVersion: milvus.io/v1alpha1
kind: Milvus
metadata:
  name: my-release
  labels:
    app: milvus
spec: 
  dependencies:
    msgStreamType: "kafka"
    kafka:
      inCluster: 
        values: {} # values can be found in https://artifacthub.io/packages/helm/bitnami/kafka
  components: {}
  config: {}

内部Kafkaサービスを設定するための完全な設定項目はこちらを参照してください。kafka.inCluster.values の下に、必要に応じて設定項目を追加します。

設定ファイルの名前がmilvuscluster.yaml の場合、次のコマンドを実行して設定を適用します。

kubectl apply -f milvuscluster.yaml

次のステップ

Milvus Operatorを使用して他のMilvus依存関係を設定する方法について説明します:

翻訳DeepLogo

フィードバック

このページは役に立ちましたか ?