Apache Kafka®とMilvus/Zilliz Cloudを接続してリアルタイムでベクターデータを取り込む
このクイックスタートガイドでは、オープンソースカフカとZilliz Cloudをセットアップしてベクトルデータを取り込む方法を紹介します。
このチュートリアルでは、Apache Kafka®を使用してベクトルデータをMilvusベクトルデータベースとZilliz Cloud(フルマネージドMilvus)にストリーミングして取り込み、セマンティック検索、レコメンデーションシステム、AIを活用した分析などの高度なリアルタイムアプリケーションを実現する方法を説明します。
Apache Kafkaは、高スループット、低レイテンシーのパイプライン用に設計された分散イベントストリーミングプラットフォームです。データベース、IoTデバイス、モバイルアプリ、クラウドサービスなどのソースからリアルタイムのデータストリームを収集、保存、処理するために広く使用されている。Kafkaは大量のデータを扱うことができるため、MilvusやZilliz Cloudのようなベクトルデータベースの重要なデータソースとなっている。
例えば、Kafkaは、ユーザーとのインタラクションやセンサーの測定値などのリアルタイムのデータストリームを、機械学習モデルからのエンベッディングとともにキャプチャし、これらのストリームをMilvusやZilliz Cloudに直接パブリッシュすることができます。いったんベクトル・データベースに格納されると、このデータはインデックス化され、検索され、効率的に分析される。
KafkaとMilvusおよびZilliz Cloudとの統合は、非構造化データワークフローのための強力なパイプラインを構築するシームレスな方法を提供します。このコネクターは、オープンソースのKafkaデプロイメントと、Confluentや StreamNativeのようなホスト型サービスの両方で動作します。
このチュートリアルでは、Zilliz Cloudをデモとして使用する:
ステップ1:kafka-connect-milvusプラグインのダウンロード
以下の手順でkafka-connect-milvusプラグインをダウンロードする。
- ここから最新のプラグインZIPファイル
zilliz-kafka-connect-milvus-xxx.zipをダウンロードしてください。
ステップ2:Kafkaのダウンロード
- ここから最新のkafkaをダウンロードします。
- ダウンロードしたファイルを解凍し、kafkaディレクトリに移動します。
$ tar -xzf kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1
ステップ3:Kafka環境の起動
注:ローカル環境にはJava 8+がインストールされている必要があります。
すべてのサービスを正しい順序で開始するために、以下のコマンドを実行します:
ZooKeeperサービスを開始する。
$ bin/zookeeper-server-start.sh config/zookeeper.propertiesKafkaブローカーサービスを開始する。
別のターミナルセッションを開いて実行する:
$ bin/kafka-server-start.sh config/server.properties
すべてのサービスが正常に起動したら、基本的なKafka環境が起動し、使用できるようになります。
- 詳細については、公式のクイックスタートガイド(https://kafka.apache.org/quickstart)を参照してください。
ステップ4:KafkaとZilliz Cloudの設定
KafkaとZilliz Cloudがセットアップされ、適切に設定されていることを確認します。
Kafkaにまだトピックがない場合は、Kafkaにトピック(例:
topic_0)を作成します。$ bin/kafka-topics.sh --create --topic topic_0 --bootstrap-server localhost:9092Zilliz Cloudにまだコレクションがない場合は、ベクトルフィールドを持つコレクションを作成します(この例ではベクトルは
dimension=8)。Zilliz Cloudでは以下のスキーマ例を使用できます:
注意:双方のスキーマが一致していることを確認してください。このスキーマでは、ベクトルフィールドは正確に1つです。双方のフィールド名は全く同じです。
ステップ5:kafka-connect-milvusプラグインをKafkaインスタンスにロードする。
ステップ1でダウンロードした
zilliz-kafka-connect-milvus-xxx.zipファイルを解凍します。zilliz-kafka-connect-milvusディレクトリを Kafka インストールのlibsディレクトリにコピーします。Kafka インストールの
configディレクトリにあるconnect-standalone.propertiesファイルを修正します。key.converter.schemas.enable=false value.converter.schemas.enable=false plugin.path=libs/zilliz-kafka-connect-milvus-xxxKafka インストールの
configディレクトリにmilvus-sink-connector.propertiesファイルを作成し、設定します。name=zilliz-kafka-connect-milvus connector.class=com.milvus.io.kafka.MilvusSinkConnector public.endpoint=https://<public.endpoint>:port token=***************************************** collection.name=topic_0 topics=topic_0
ステップ6:コネクターを起動する
前の設定ファイルを使用してコネクタを起動します。
$ bin/connect-standalone.sh config/connect-standalone.properties config/milvus-sink-connector.propertiesKafkaで作成したKafkaトピックにメッセージを生成してみる
bin/kafka-console-producer.sh --topic topic_0 --bootstrap-server localhost:9092 >{"id": 0, "title": "The Reported Mortality Rate of Coronavirus Is Not Important", "title_vector": [0.041732933, 0.013779674, -0.027564144, -0.013061441, 0.009748648, 0.00082446384, -0.00071647146, 0.048612226], "link": "https://medium.com/swlh/the-reported-mortality-rate-of-coronavirus-is-not-important-369989c8d912"}エンティティがZilliz Cloudのコレクションに挿入されているか確認します。挿入に成功した場合のZilliz Cloud上の表示は以下のようになります:

サポート
Kafka Connect Milvus Connectorに関するご質問やサポートが必要な場合は、コネクタのメンテナまでお気軽にお問い合わせください:メール :support@zilliz.com