用 Milvus 连接 Kafka
在本快速入门指南中,我们将展示如何设置开源 kafka 和 Zilliz Cloud 以摄取向量数据。
步骤 1:下载 kafka-connect-milvus 插件
完成以下步骤下载 kafka-connect-milvus 插件。
- 从此处下载最新插件压缩文件
zilliz-kafka-connect-milvus-xxx.zip
。
第 2 步:下载 Kafka
- 从此处下载最新的 kafka。
- 解压缩下载的文件并转到 kafka 目录。
$ tar -xzf kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1
第 3 步:启动 Kafka 环境
注意:本地环境必须安装 Java 8 以上。
运行以下命令以按正确顺序启动所有服务:
启动 ZooKeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka 代理服务
打开另一个终端会话并运行:
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务都成功启动,你就拥有了一个基本的 Kafka 运行环境,可以随时使用了。
- 详情请查看 Kafka 官方快速入门指南:https://kafka.apache.org/quickstart
第 4 步:配置 Kafka 和 Zilliz Cloud
确保已设置并正确配置 Kafka 和 Zilliz Cloud。
如果 Kafka 中还没有主题,请在 Kafka 中创建一个主题(例如
topic_0
)。$ bin/kafka-topics.sh --create --topic topic_0 --bootstrap-server localhost:9092
如果 Zilliz Cloud 中还没有 Collections,请创建一个带有向量字段的 Collections(本例中向量为
dimension=8
)。您可以在 Zilliz Cloud 上使用以下示例 Schema:注意:确保双方的 Schema 相互匹配。在 Schema 中,正好有一个向量字段。双方每个字段的名称完全相同。
第 5 步:加载 kafka-connect-milvus 插件到 Kafka 实例
解压缩在步骤 1 中下载的
zilliz-kafka-connect-milvus-xxx.zip
文件。将
zilliz-kafka-connect-milvus
目录复制到 Kafka 安装的libs
目录中。修改 Kafka 安装
config
目录中的connect-standalone.properties
文件。key.converter.schemas.enable=false value.converter.schemas.enable=false plugin.path=libs/zilliz-kafka-connect-milvus-xxx
在 Kafka 安装的
config
目录中创建并配置milvus-sink-connector.properties
文件。name=zilliz-kafka-connect-milvus connector.class=com.milvus.io.kafka.MilvusSinkConnector public.endpoint=https://<public.endpoint>:port token=***************************************** collection.name=topic_0 topics=topic_0
第 6 步:启动连接器
使用之前的配置文件启动连接器
$ bin/connect-standalone.sh config/connect-standalone.properties config/milvus-sink-connector.properties
尝试向刚刚在 Kafka 中创建的 Kafka 主题发送消息
bin/kafka-console-producer.sh --topic topic_0 --bootstrap-server localhost:9092 >{"id": 0, "title": "The Reported Mortality Rate of Coronavirus Is Not Important", "title_vector": [0.041732933, 0.013779674, -0.027564144, -0.013061441, 0.009748648, 0.00082446384, -0.00071647146, 0.048612226], "link": "https://medium.com/swlh/the-reported-mortality-rate-of-coronavirus-is-not-important-369989c8d912"}
检查实体是否已插入 Zilliz Cloud 中的 Collections。下面是插入成功后在 Zilliz Cloud 上的显示效果:
支持
如果您需要任何帮助或对 Kafka Connect Milvus Connector 有任何疑问,请随时联系我们的支持团队:电子邮件: support@zilliz.com