將 Apache Spark™ 與 Milvus/Zilliz Cloud 用於 AI 管線
Spark-Milvus Connector提供 Apache Spark 和 Databricks 與 Milvus 和 Zilliz Cloud 的整合。它將 Apache Spark 強大的大資料處理和機器學習 (ML) 功能與 Milvus 最先進的向量搜尋功能相結合。此整合可簡化工作流程,以進行人工智能驅動的搜尋、進階分析、ML 訓練,並有效管理大規模向量資料。
Apache Spark 是一個分散式資料處理平台,專為以高速運算處理大量資料集而設計。與 Milvus 或 Zilliz Cloud 搭配使用時,可為語意搜尋、推薦系統和 AI 驅動的資料分析等使用個案開啟新的可能性。
舉例來說,Spark 可以批量處理大型資料集,透過 ML 模型產生嵌入式資料,然後再使用 Spark-Milvus 連結器,將這些嵌入式資料直接儲存於 Milvus 或 Zilliz Cloud。一旦編入索引,就可以快速搜尋或分析這些資料,為 AI 和大資料工作流程建立強大的管道。
Spark-Milvus 連接器支援迭代與大量資料擷取至 Milvus、系統間資料同步,以及對儲存於 Milvus 的向量資料進行進階分析等任務。本指南將教您如何有效配置和使用連接器的步驟,以應用於下列使用個案:
- 有效率地將向量資料大量載入 Milvus、
- 在 Milvus 和其他儲存系統或資料庫之間移動資料、
- 利用 Spark MLlib 和其他 AI 工具分析 Milvus 中的資料。
快速啟動
準備工作
Spark-Milvus Connector 支援 Scala 和 Python 程式語言。使用者可搭配Pyspark或Spark-shell 使用。若要執行本範例,請依照下列步驟建立包含 Spark-Milvus Connector 相依性的 Spark 環境:
安裝 Apache Spark (版本 >= 3.3.0)
您可以參考官方文件安裝 Apache Spark。
下載spark-milvusjar 檔案。
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
以spark-milvusjar 作為其中一個依賴項目,啟動 Spark 執行時間。
若要使用 Spark-Milvus Connector 來啟動 Spark runtime,請將下載的spark-milvus作為相依性加入指令中。
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
示範
在這個示範中,我們會建立一個有向量資料的 Spark DataFrame 範例,並透過 Spark-Milvus Connector 將資料寫入 Milvus。一個集合會根據 schema 和指定的選項自動在 Milvus 中建立。
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
執行上述程式碼後,您可以使用 SDK 或 Attu (A Milvus Dashboard) 在 Milvus 中檢視插入的資料。您可以發現一個名為hello_spark_milvus
的集合已經被插入了 4 個實體。
功能與概念
Milvus 選項
在快速入門部分,我們展示了使用 Milvus 操作時的設定選項。這些選項被抽象為 Milvus 選項。它們用來建立與 Milvus 的連線,並控制其他 Milvus 行為。不是所有的選項都是強制性的。
選項關鍵 | 預設值 | 說明 |
---|---|---|
milvus.host | localhost | Milvus 伺服器主機。詳細資訊請參閱管理 Milvus 連線。 |
milvus.port | 19530 | Milvus 伺服器連接埠。詳細資訊請參閱管理 Milvus 連線。 |
milvus.username | root | Milvus 伺服器的使用者名稱。詳細資訊請參閱管理 Milvus 連線。 |
milvus.password | Milvus | Milvus 伺服器密碼。詳細資訊請參閱管理 Milvus 連線。 |
milvus.uri | -- | Milvus 伺服器 URI。詳情請參閱管理 Milvus 連線。 |
milvus.token | -- | Milvus 伺服器代碼。詳情請參閱管理 Milvus 連線。 |
milvus.database.name | default | 要讀取或寫入的 Milvus 資料庫名稱。 |
milvus.collection.name | hello_milvus | 要讀取或寫入的 Milvus 資料庫名稱。 |
milvus.collection.primaryKeyField | None | 資料集中主索引欄位的名稱。如果集合不存在,則為必填字段。 |
milvus.collection.vectorField | None | 集合中向量欄位的名稱。若集合不存在,則必須填寫。 |
milvus.collection.vectorDim | None | 集合中向量欄位的尺寸。若集合不存在,則必須填寫。 |
milvus.collection.autoID | false | 如果集合不存在,此選項指定是否自動為實體產生 ID。更多資訊,請參閱create_collection |
milvus.bucket | a-bucket | Milvus 儲存中的 Bucket 名稱。這應該與milvus.yaml 中的minio.bucketName 相同。 |
milvus.rootpath | files | Milvus 儲存的根目錄。這應該與milvus.yaml 中的minio.rootpath 相同。 |
milvus.fs | s3a:// | Milvus 儲存的檔案系統。s3a:// 適用於開放原始碼 Spark。對於 Databricks,請使用s3:// 。 |
milvus.storage.endpoint | localhost:9000 | Milvus 儲存的端點。這應該與milvus.yaml 中的minio.address :minio.port 相同。 |
milvus.storage.user | minioadmin | Milvus 儲存的使用者。這應該與milvus.yaml 中的minio.accessKeyID 相同。 |
milvus.storage.password | minioadmin | Milvus 儲存的密碼。這應該與milvus.yaml 中的minio.secretAccessKey 相同。 |
milvus.storage.useSSL | false | 是否為 Milvus 儲存使用 SSL。這應該與milvus.yaml 中的minio.useSSL 相同。 |
Milvus 資料格式
Spark-Milvus Connector 支援以下列 Milvus 資料格式讀寫資料:
milvus
:Milvus 資料格式,可從 Spark DataFrame 無縫轉換為 Milvus 實體。milvusbinlog
:讀取 Milvus 內建 binlog 資料的 Milvus 資料格式。mjson
:Milvus JSON 格式,用於將大量資料插入 Milvus。
Milvus
在快速啟動中,我們使用milvus格式將範例資料寫入 Milvus 叢集。milvus格式是一種新的資料格式,支援將 Spark DataFrame 資料無縫寫入 Milvus 集合。這是透過批次呼叫 Milvus SDK 的 Insert API 來實現的。如果 Milvus 中不存在集合,則會根據 Dataframe 的模式創建一個新的集合。然而,自動建立的集合可能不支援集合模式的所有功能。因此,建議先透過 SDK 建立集合,然後再使用 spark-milvus 進行寫入。如需更多資訊,請參閱示範。
milvusbinlog
新的資料格式milvusbinlog是用來讀取 Milvus 內建的 binlog 資料。Binlog 是 Milvus 基於 parquet 的內部資料儲存格式。除非您熟悉 Milvus 內部儲存的細節,否則不建議直接使用milvusbinlog。我們建議使用將在下一節介紹的MilvusUtils函式。
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus 提供Bulkinsert功能,以便在操作大型資料集時有更好的寫入效能。然而,Milvus 使用的 JSON 格式與 Spark 預設的 JSON 輸出格式略有不同,為了解決這個問題,我們引入mjson資料格式來產生符合 Milvus 要求的資料。以下是一個範例,說明 JSON-lines 和mjson 的差異:
JSON-lines:
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]} {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]} {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]} {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]} {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
mjson (Milvus Bulkinsert 所需的):
{ "rows":[ {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}, {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}, {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}, {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}, {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} ] }
這將在未來得到改進。如果您的 Milvus 版本是 v2.3.7+ 且支援使用 Parquet 格式的 bulkinsert,我們建議您在 spark-milvus 整合中使用 parquet 格式。請參閱 Github 上的示範。
MilvusUtils
MilvusUtils 包含數個有用的 util 函數。目前只支援 Scala。更多使用範例請參閱進階使用部分。
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection是一個簡單的介面,用來載入整個 Milvus 套件到 Spark 資料框。它包裝了各種操作,包括呼叫 Milvus SDK、讀取milvusbinlog和一般的union/join操作。
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark提供了一個方便的方式,將 Spark 輸出檔案大量匯入 Milvus。它包裝了 Milvus SDK 的BullkinsertAPI。
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
進階用法
在本節中,您將找到 Spark-Milvus Connector 用於資料分析和遷移的進階使用範例。如需更多示範,請參閱範例。
MySQL -> 嵌入 -> Milvus
在這個示範中,我們會
- 透過 Spark-MySQL Connector 從 MySQL 讀取資料、
- 產生 embedding (以 Word2Vec 為例),以及
- 將嵌入資料寫入 Milvus。
要啟用 Spark-MySQL Connector,您需要在 Spark 環境中加入下列依賴:
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> Transform -> Milvus
在這個示範中,我們會
- 從 Milvus 套件讀取資料、
- 套用轉換 (以 PCA 為例),以及
- 透過 Bulkinsert API 將轉換後的資料寫入另一個 Milvus。
PCA 模型是一種轉換模型,可以降低嵌入向量的維度,這是機器學習中常見的操作。 您可以在轉換步驟中加入任何其他處理操作,例如過濾、連結或歸一化。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
如果您使用 Zilliz Cloud(Milvus 的管理服務),您可以利用其便利的資料匯入 API。Zilliz Cloud 提供全面的工具和說明文件,協助您有效率地從各種資料來源移動資料,包括 Spark 和 Databricks。只需設定一個 S3 bucket 作為中介,並開放其存取至您的 Zilliz Cloud 帳戶。Zilliz Cloud 的資料匯入 API 會自動從 S3 資料桶載入整批資料到您的 Zilliz Cloud 叢集。
準備工作
將 jar 檔案加入您的 Databricks Cluster,載入 Spark 運行時間。
您可以用不同的方式安裝函式庫。此螢幕截圖顯示從本機上傳 jar 到叢集。如需詳細資訊,請參閱 Databricks 文件中的叢集函式庫。
安裝 Databricks 函式庫
建立一個 S3 bucket,並將其設定為 Databricks 叢集的外部儲存位置。
Bulkinsert 要求將資料儲存在臨時儲存桶中,以便 Zilliz Cloud 能夠批次匯入資料。您可以创建一个 S3 bucket 并将其配置为 databricks 的外部位置。詳情請參閱外部位置。
保護您的 Databricks 認證。
如需詳細資訊,請參閱部落格 Securely Managing Credentials in Databricks 的說明。
示範
以下是展示批次資料遷移過程的程式碼片段。類似於上述 Milvus 的範例,您只需要更換憑證和 S3 儲存桶位址。
// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
實作筆記
為了幫助您快速上手 Spark-Milvus Connector,您可以查看筆記型電腦,它可協助您完成 Spark 至 Milvus 和 Zilliz Cloud 的串流和批次資料擷取範例。