milvus-logo
LFAI
首页
  • 工具

Spark-Milvus 连接器用户指南

Spark-Milvus Connector (https://github.com/zilliztech/spark-milvus) 提供 Apache Spark 和 Milvus 之间的无缝集成,将 Apache Spark 的数据处理和 ML 功能与 Milvus 的向量数据存储和搜索功能相结合。这种集成可实现各种有趣的应用,包括

  • 高效地将向量数据大批量加载到 Milvus 中、
  • 在 Milvus 和其他存储系统或数据库之间移动数据、
  • 利用 Spark MLlib 和其他人工智能工具分析 Milvus 中的数据。

快速启动

准备工作

Spark-Milvus 连接器支持 Scala 和 Python 编程语言。用户可以使用PysparkSpark-shell。要运行此演示,请按以下步骤设置包含 Spark-Milvus Connector 依赖关系的 Spark 环境:

  1. 安装 Apache Spark(版本 >= 3.3.0)

    您可以参考官方文档安装 Apache Spark。

  2. 下载spark-milvusjar 文件。

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. spark-milvusjar 作为依赖项之一启动 Spark 运行时。

    要使用 Spark-Milvus 连接器启动 Spark 运行时,请将下载的spark-milvus作为依赖项添加到命令中。

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

演示

在本演示中,我们将创建一个包含向量数据的示例 Spark DataFrame,并通过 Spark-Milvus Connector 将其写入 Milvus。Milvus 将根据模式和指定选项自动创建一个集合。

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
 .mode("append") \
 .option("milvus.host", "localhost") \
 .option("milvus.port", "19530") \
 .option("milvus.collection.name", "hello_spark_milvus") \
 .option("milvus.collection.vectorField", "vec") \
 .option("milvus.collection.vectorDim", "8") \
 .option("milvus.collection.primaryKeyField", "id") \
 .format("milvus") \
 .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

执行上述代码后,你可以使用 SDK 或 Attu(Milvus 控制面板)在 Milvus 中查看插入的数据。您可以看到一个名为hello_spark_milvus 的集合,其中已插入了 4 个实体。

功能和概念

Milvus 选项

在 "快速入门 "部分,我们展示了 Milvus 操作过程中的选项设置。这些选项被抽象为 Milvus 选项。它们用于创建与 Milvus 的连接,并控制 Milvus 的其他行为。并非所有选项都是强制性的。

选项键默认值说明
milvus.hostlocalhostMilvus 服务器主机。详见管理 Milvus 连接
milvus.port19530Milvus 服务器端口。详见管理 Milvus 连接
milvus.usernamerootMilvus 服务器的用户名。详见管理 Milvus 连接
milvus.passwordMilvusMilvus 服务器密码。详见管理 Milvus 连接
milvus.uri--Milvus 服务器 URI。详见管理 Milvus 连接
milvus.token--Milvus 服务器令牌。详见管理 Milvus 连接
milvus.database.namedefault要读取或写入的 Milvus 数据库名称。
milvus.collection.namehello_milvus要读取或写入的 Milvus 集合名称。
milvus.collection.primaryKeyFieldNone集合中主键字段的名称。如果集合不存在,则为必填项。
milvus.collection.vectorFieldNone集合中向量字段的名称。如果集合不存在,则为必填项。
milvus.collection.vectorDimNone集合中向量字段的尺寸。如果集合不存在,则为必填项。
milvus.collection.autoIDfalse如果集合不存在,该选项指定是否自动为实体生成 ID。更多信息,请参阅create_collection
milvus.bucketa-bucketMilvus 存储中的存储桶名称。该名称应与milvus.yaml 中的minio.bucketName 相同。
milvus.rootpathfilesMilvus 存储的根路径。应与milvus.yaml 中的minio.rootpath 相同。
milvus.fss3a://Milvus 存储的文件系统。s3a:// 适用于开源 Spark。Databricks 使用s3://
milvus.storage.endpointlocalhost:9000Milvus 存储的端点。该值应与milvus.yaml 中的minio.address:minio.port 相同。
milvus.storage.userminioadminMilvus 存储的用户。应与milvus.yaml 中的minio.accessKeyID 相同。
milvus.storage.passwordminioadminMilvus 存储的密码。应与milvus.yaml 中的minio.secretAccessKey 相同。
milvus.storage.useSSLfalse是否为 Milvus 存储使用 SSL。应与milvus.yaml 中的minio.useSSL 相同。

Milvus 数据格式

Spark-Milvus 连接器支持以下列 Milvus 数据格式读写数据:

  • milvus:用于从 Spark DataFrame 无缝转换为 Milvus 实体的 Milvus 数据格式。
  • milvusbinlog:用于读取 Milvus 内置 binlog 数据的 Milvus 数据格式。
  • mjson:用于向 Milvus 批量插入数据的 Milvus JSON 格式。

Milvus

快速入门中,我们使用milvus格式将样本数据写入 Milvus 集群。milvus格式是一种新的数据格式,支持将 Spark DataFrame 数据无缝写入 Milvus 集合。这是通过批量调用 Milvus SDK 的插入 API 来实现的。如果 Milvus 中不存在集合,则会根据数据帧的模式创建新的集合。不过,自动创建的集合可能不支持Collection Schema 的所有功能。因此,建议先通过 SDK 创建一个集合,然后再使用 spark-milvus 进行编写。有关详细信息,请参阅演示

milvusbinlog

新数据格式milvusbinlog用于读取 Milvus 内置的 binlog 数据。Binlog 是 Milvus 基于 parquet 的内部数据存储格式。除非你熟悉Milvus内部存储的细节,否则不建议直接使用milvusbinlog。我们建议使用下一节将介绍的MilvusUtils函数。

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus 提供Bulkinsert功能,以便在处理大型数据集时提高写入性能。然而,Milvus 使用的 JSON 格式与 Spark 的默认 JSON 输出格式略有不同。 为了解决这个问题,我们引入了mjson数据格式,以生成符合 Milvus 要求的数据。下面的示例展示了 JSON-lines 和mjson 之间的区别:

  • JSON-lines:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson(Milvus Bulkinsert 要求):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

未来将对此进行改进。如果您的 Milvus 版本是 v2.3.7 或更高,且支持使用 Parquet 格式的 Bulkinsert,我们建议您在 spark-milvus 集成中使用 parquet 格式。请参见 Github 上的演示

MilvusUtils

MilvusUtils 包含多个有用的 util 函数。目前仅支持 Scala 语言。更多使用示例请参见高级使用部分。

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection是将整个Milvus数据集加载到 Spark 数据帧的简单接口。它封装了各种操作,包括调用 Milvus SDK、读取milvusbinlog和常见的联合/连接操作。

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark提供了将 Spark 输出文件大批量导入 Milvus 的便捷方法。它封装了 Milvus SDK 的BullkinsertAPI。

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

高级用法

在本节中,您将找到 Spark-Milvus 连接器用于数据分析和迁移的高级使用示例。更多演示,请参阅示例

MySQL -> 嵌入 -> Milvus

在本演示中,我们将

  1. 通过 Spark-MySQL 连接器从 MySQL 读取数据、
  2. 生成嵌入(以 Word2Vec 为例),以及
  3. 将嵌入数据写入 Milvus。

要启用 Spark-MySQL 连接器,需要在 Spark 环境中添加以下依赖项:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo extends App {

val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()

import spark.implicits.\_

// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")

// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()

// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)

val result = model.transform(tokenizedDf)

val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")

milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}

Milvus -> 转换 -> Milvus

在本演示中,我们将

  1. 从 Milvus 数据集中读取数据、
  2. 应用转换(以 PCA 为例),以及
  3. 通过 Bulkinsert API 将转换后的数据写入另一个 Milvus。

PCA 模型是一种可降低嵌入向量维度的变换模型,是机器学习中的常见操作。 你可以在变换步骤中添加任何其他处理操作,如过滤、连接或归一化。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters.\_

object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits.\_

val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"

val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)

// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")

// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")

// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)

// 5. Config MilvusOptions of target table 
val targetProperties = Map(
MilvusOptions.MILVUS*HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))

// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

如果您使用的是 Zilliz Cloud(Milvus 托管服务),您可以利用其便捷的数据导入 API。Zilliz Cloud 提供全面的工具和文档,帮助您高效地从各种数据源(包括 Spark 和 Databricks)移动数据。只需将 S3 存储桶设置为中介,并开放其对 Zilliz Cloud 账户的访问。Zilliz Cloud 的数据导入 API 会自动将 S3 存储桶中的整批数据加载到 Zilliz Cloud 集群。

准备工作

  1. 将 jar 文件添加到 Databricks 集群,加载 Spark 运行时。

    您可以通过不同方式安装库。本截图显示的是从本地向集群上传 jar 文件。更多信息,请参阅 Databricks 文档中的集群库

    Install Databricks Library 安装 Databricks 库

  2. 创建一个 S3 bucket,并将其配置为 Databricks 集群的外部存储位置。

    Bulkinsert 要求将数据存储在临时存储桶中,以便 Zilliz Cloud 能批量导入数据。您可以创建一个 S3 存储桶,并将其配置为 Databricks 的外部位置。详情请参阅外部位置

  3. 确保 Databricks 凭据的安全。

    有关详细信息,请参阅博客 "在 Databricks 中安全管理凭据 "中的说明。

演示

下面是展示批量数据迁移过程的代码片段。与上述 Milvus 示例类似,你只需替换凭证和 S3 存储桶地址。

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

实际操作

为了帮助您快速上手 Spark-Milvus Connector,我们准备了一本笔记本,指导您使用 Milvus 和 Zilliz Cloud 完成流式和批量数据传输过程。

翻译自DeepLogo

反馈

此页对您是否有帮助?