Guide de l'utilisateur du connecteur Spark-Milvus
Le connecteur Spark-Milvus (https://github.com/zilliztech/spark-milvus) permet une intégration transparente entre Apache Spark et Milvus, en combinant les fonctions de traitement des données et de ML d'Apache Spark avec les capacités de stockage et de recherche de données vectorielles de Milvus. Cette intégration permet diverses applications intéressantes, notamment
- Charger efficacement des données vectorielles dans Milvus par lots importants,
- Déplacer des données entre Milvus et d'autres systèmes de stockage ou bases de données,
- Analyser les données dans Milvus en exploitant Spark MLlib et d'autres outils d'IA.
Démarrage rapide
Préparation
Le connecteur Spark-Milvus prend en charge les langages de programmation Scala et Python. Les utilisateurs peuvent l'utiliser avec Pyspark ou Spark-shell. Pour exécuter cette démo, configurez un environnement Spark contenant la dépendance Spark-Milvus Connector en suivant les étapes suivantes :
Installer Apache Spark (version >= 3.3.0)
Vous pouvez installer Apache Spark en vous référant à la documentation officielle.
Téléchargez le fichier jar spark-milvus.
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
Démarrer le runtime Spark avec spark-milvus jar comme l'une des dépendances.
Pour démarrer l'exécution de Spark avec le connecteur Spark-Milvus, ajoutez le fichier spark-milvus téléchargé comme dépendance à la commande.
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
Démonstration
Dans cette démo, nous créons un exemple de DataFrame Spark avec des données vectorielles et nous l'écrivons dans Milvus via le connecteur Spark-Milvus. Une collection sera créée automatiquement dans Milvus en fonction du schéma et des options spécifiées.
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
Après avoir exécuté le code ci-dessus, vous pouvez visualiser les données insérées dans Milvus à l'aide du SDK ou d'Attu (un tableau de bord Milvus). Vous pouvez trouver une collection nommée hello_spark_milvus
créée avec 4 entités déjà insérées.
Fonctionnalités et concepts
Options de Milvus
Dans la section Démarrage rapide, nous avons montré les options de paramétrage pendant les opérations avec Milvus. Ces options sont abstraites sous le nom d'options Milvus. Elles sont utilisées pour créer des connexions à Milvus et contrôler d'autres comportements de Milvus. Toutes les options ne sont pas obligatoires.
Option Clé | Valeur par défaut | Description de l'option |
---|---|---|
milvus.host | localhost | Hôte du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails. |
milvus.port | 19530 | Port du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails. |
milvus.username | root | Nom d'utilisateur pour le serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails. |
milvus.password | Milvus | Mot de passe pour le serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails. |
milvus.uri | -- | URI du serveur Milvus. Voir la section Gérer les connexions Milvus pour plus de détails. |
milvus.token | -- | Jeton du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails. |
milvus.database.name | default | Nom de la base de données Milvus à lire ou à écrire. |
milvus.collection.name | hello_milvus | Nom de la collection Milvus à lire ou à écrire. |
milvus.collection.primaryKeyField | None | Nom du champ de clé primaire dans la collection. Requis si la collection n'existe pas. |
milvus.collection.vectorField | None | Nom du champ vectoriel de la collection. Obligatoire si la collection n'existe pas. |
milvus.collection.vectorDim | None | Dimension du champ vectoriel de la collection. Obligatoire si la collection n'existe pas. |
milvus.collection.autoID | false | Si la collection n'existe pas, cette option indique s'il faut générer automatiquement des identifiants pour les entités. Pour plus d'informations, voir create_collection |
milvus.bucket | a-bucket | Nom de la base de données dans le stockage Milvus. Il doit être identique à minio.bucketName dans milvus.yaml. |
milvus.rootpath | files | Chemin racine du stockage Milvus. Il doit être identique à minio.rootpath dans milvus.yaml. |
milvus.fs | s3a:// | Système de fichiers du stockage Milvus. La valeur s3a:// s'applique à Spark open-source. Utilisez s3:// pour Databricks. |
milvus.storage.endpoint | localhost:9000 | Point de terminaison du stockage Milvus. Il doit être identique à minio.address :minio.port dans milvus.yaml. |
milvus.storage.user | minioadmin | Utilisateur du stockage Milvus. Il doit être identique à minio.accessKeyID dans milvus.yaml. |
milvus.storage.password | minioadmin | Mot de passe du stockage Milvus. Il doit être identique à minio.secretAccessKey dans milvus.yaml. |
milvus.storage.useSSL | false | Utilisation ou non de SSL pour le stockage Milvus. Il doit être identique à minio.useSSL dans milvus.yaml. |
Format des données Milvus
Le connecteur Spark-Milvus prend en charge la lecture et l'écriture de données dans les formats de données Milvus suivants :
milvus
: Le format de données Milvus pour une conversion transparente de Spark DataFrame en entités Milvus.milvusbinlog
: Le format de données Milvus pour la lecture des données binlog intégrées de Milvus.mjson
: Format JSON Milvus pour l'insertion en bloc de données dans Milvus.
milvus
Dans la section Démarrage rapide, nous utilisons le format milvus pour écrire des échantillons de données dans un cluster Milvus. Le format milvus est un nouveau format de données qui prend en charge l'écriture transparente de données Spark DataFrame dans Milvus Collections. Ceci est réalisé par des appels par lots à l'API Insert du SDK Milvus. Si une collection n'existe pas dans Milvus, une nouvelle collection sera créée sur la base du schéma du Dataframe. Toutefois, la collection créée automatiquement peut ne pas prendre en charge toutes les fonctionnalités du schéma de collection. Par conséquent, il est recommandé de créer d'abord une collection via le SDK, puis d'utiliser spark-milvus pour l'écriture. Pour plus d'informations, veuillez vous référer à la démo.
milvusbinlog
Le nouveau format de données milvusbinlog permet de lire les données binlog intégrées de Milvus. Binlog est le format de stockage de données interne de Milvus basé sur parquet. Malheureusement, il ne peut pas être lu par une bibliothèque parquet classique, c'est pourquoi nous avons implémenté ce nouveau format de données pour aider le travail Spark à le lire. Il n'est pas recommandé d'utiliser milvusbinlog directement à moins d'être familier avec les détails du stockage interne de milvus. Nous suggérons d'utiliser la fonction MilvusUtils qui sera présentée dans la section suivante.
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus fournit la fonctionnalité Bulkinsert pour améliorer les performances d'écriture lors de l'utilisation de grands ensembles de données. Cependant, le format JSON utilisé par Milvus est légèrement différent du format de sortie JSON par défaut de Spark. Pour résoudre ce problème, nous introduisons le format de données mjson pour générer des données qui répondent aux exigences de Milvus. Voici un exemple qui montre la différence entre JSON-lines et mjson:
JSON-lignes :
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]} {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]} {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]} {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]} {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
mjson (requis pour Milvus Bulkinsert) :
{ "rows":[ {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}, {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}, {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}, {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}, {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} ] }
Ce point sera amélioré à l'avenir. Nous recommandons d'utiliser le format parquet dans l'intégration spark-milvus si votre version Milvus est v2.3.7+ qui supporte bulkinsert avec le format Parquet. Voir la démo sur Github.
MilvusUtils
MilvusUtils contient plusieurs fonctions util utiles. Actuellement, il n'est supporté qu'en Scala. Plus d'exemples d'utilisation se trouvent dans la section Utilisation avancée.
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection est une interface simple pour charger une collection Milvus entière dans un Dataframe Spark. Elle englobe diverses opérations, notamment l'appel au SDK Milvus, la lecture du milvusbinlog et les opérations communes d'union/jointure.
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark fournit un moyen pratique d'importer des fichiers de sortie Spark dans Milvus dans un grand lot. Il intègre l'API Bullkinsert du SDK Milvus.
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
Utilisation avancée
Dans cette section, vous trouverez des exemples d'utilisation avancée du connecteur Spark-Milvus pour l'analyse et la migration des données. Pour plus de démonstrations, voir exemples.
MySQL -> intégration -> Milvus
Dans cette démo, nous allons
- Lire les données de MySQL à travers le connecteur Spark-MySQL,
- Générer de l'embedding (en utilisant Word2Vec comme exemple), et
- écrire des données intégrées dans Milvus.
Pour activer le connecteur Spark-MySQL, vous devez ajouter la dépendance suivante à votre environnement Spark :
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> Transform -> Milvus
Dans cette démo, nous allons
- Lire les données d'une collection Milvus,
- Appliquer une transformation (en utilisant l'ACP comme exemple), et
- écrire les données transformées dans un autre Milvus via l'API Bulkinsert.
Le modèle PCA est un modèle de transformation qui réduit la dimensionnalité des vecteurs d'intégration, ce qui est une opération courante dans l'apprentissage automatique. Vous pouvez ajouter d'autres opérations de traitement, telles que le filtrage, la jonction ou la normalisation, à l'étape de transformation.
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
Si vous utilisez Zilliz Cloud (le service géré de Milvus), vous pouvez tirer parti de son API d'importation de données très pratique. Zilliz Cloud fournit des outils et une documentation complets pour vous aider à déplacer efficacement vos données à partir de diverses sources de données, y compris Spark et Databricks. Il vous suffit de configurer un bucket S3 en tant qu'intermédiaire et d'ouvrir son accès à votre compte Zilliz Cloud. L'API d'importation de données de Zilliz Cloud chargera automatiquement le lot complet de données depuis le bucket S3 vers votre cluster Zilliz Cloud.
Préparations
Chargez le runtime Spark en ajoutant un fichier jar à votre cluster Databricks.
Vous pouvez installer une bibliothèque de différentes manières. Cette capture d'écran montre le téléchargement d'un fichier jar du local vers le cluster. Pour plus d'informations, voir Cluster Libraries dans la documentation Databricks.
Installer la bibliothèque Databricks
Créez un bucket S3 et configurez-le comme emplacement de stockage externe pour votre cluster Databricks.
Bulkinsert a exigé que les données soient stockées dans un seau temporaire afin que Zilliz Cloud puisse importer les données par lots. Vous pouvez créer un seau S3 et le configurer en tant qu'emplacement externe de Databricks. Veuillez vous référer à la section Emplacements externes pour plus de détails.
Sécurisez vos identifiants Databricks.
Pour plus de détails, consultez les instructions sur le blog Securely Managing Credentials in Databricks.
Démonstration
Voici un extrait de code illustrant le processus de migration de données par lots. Comme dans l'exemple Milvus ci-dessus, il suffit de remplacer l'identifiant et l'adresse du seau S3.
// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
Travaux pratiques
Pour vous aider à démarrer rapidement avec le connecteur Spark-Milvus, nous avons préparé un carnet de notes qui vous guide à travers les processus de transfert de données en continu et par lots, avec Milvus et Zilliz Cloud.