milvus-logo
LFAI
Home
  • Outils

Guide de l'utilisateur du connecteur Spark-Milvus

Le connecteur Spark-Milvus (https://github.com/zilliztech/spark-milvus) permet une intégration transparente entre Apache Spark et Milvus, en combinant les fonctions de traitement de données et de ML d'Apache Spark avec les capacités de stockage et de recherche de données vectorielles de Milvus. Cette intégration permet diverses applications intéressantes, notamment

  • Charger efficacement des données vectorielles dans Milvus par lots importants,
  • Déplacer des données entre Milvus et d'autres systèmes de stockage ou bases de données,
  • Analyser les données dans Milvus en exploitant Spark MLlib et d'autres outils d'intelligence artificielle.

Démarrage rapide

Préparation

Le connecteur Spark-Milvus prend en charge les langages de programmation Scala et Python. Les utilisateurs peuvent l'utiliser avec Pyspark ou Spark-shell. Pour exécuter cette démo, configurez un environnement Spark contenant la dépendance Spark-Milvus Connector en suivant les étapes suivantes :

  1. Installer Apache Spark (version >= 3.3.0)

    Vous pouvez installer Apache Spark en vous référant à la documentation officielle.

  2. Téléchargez le fichier jar spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Démarrer le runtime Spark avec spark-milvus jar comme l'une des dépendances.

    Pour démarrer l'exécution de Spark avec le connecteur Spark-Milvus, ajoutez le fichier spark-milvus téléchargé comme dépendance à la commande.

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Démonstration

Dans cette démo, nous créons un exemple de DataFrame Spark avec des données vectorielles et nous l'écrivons dans Milvus via le connecteur Spark-Milvus. Une collection sera créée automatiquement dans Milvus en fonction du schéma et des options spécifiées.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

Après avoir exécuté le code ci-dessus, vous pouvez visualiser les données insérées dans Milvus à l'aide du SDK ou d'Attu (un tableau de bord Milvus). Vous pouvez trouver une collection nommée hello_spark_milvus créée avec 4 entités déjà insérées.

Fonctionnalités et concepts

Options de Milvus

Dans la section Démarrage rapide, nous avons montré les options de paramétrage pendant les opérations avec Milvus. Ces options sont abstraites sous le nom d'options Milvus. Elles sont utilisées pour créer des connexions à Milvus et contrôler d'autres comportements de Milvus. Toutes les options ne sont pas obligatoires.

Option CléValeur par défautDescription de l'option
milvus.hostlocalhostHôte du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails.
milvus.port19530Port du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails.
milvus.usernamerootNom d'utilisateur pour le serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails.
milvus.passwordMilvusMot de passe pour le serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails.
milvus.uri--URI du serveur Milvus. Voir la section Gérer les connexions Milvus pour plus de détails.
milvus.token--Jeton du serveur Milvus. Voir Gérer les connexions Milvus pour plus de détails.
milvus.database.namedefaultNom de la base de données Milvus à lire ou à écrire.
milvus.collection.namehello_milvusNom de la collection Milvus à lire ou à écrire.
milvus.collection.primaryKeyFieldNoneNom du champ de clé primaire dans la collection. Requis si la collection n'existe pas.
milvus.collection.vectorFieldNoneNom du champ vectoriel de la collection. Obligatoire si la collection n'existe pas.
milvus.collection.vectorDimNoneDimension du champ vectoriel de la collection. Obligatoire si la collection n'existe pas.
milvus.collection.autoIDfalseSi la collection n'existe pas, cette option indique s'il faut générer automatiquement des identifiants pour les entités. Pour plus d'informations, voir create_collection
milvus.bucketa-bucketNom de la base de données dans le stockage Milvus. Il doit être identique à minio.bucketName dans milvus.yaml.
milvus.rootpathfilesChemin racine du stockage Milvus. Il doit être identique à minio.rootpath dans milvus.yaml.
milvus.fss3a://Système de fichiers du stockage Milvus. La valeur s3a:// s'applique à Spark open-source. Utilisez s3:// pour Databricks.
milvus.storage.endpointlocalhost:9000Point final du stockage Milvus. Il doit être identique à minio.address:minio.port dans milvus.yaml.
milvus.storage.userminioadminUtilisateur du stockage Milvus. Il doit être identique à minio.accessKeyID dans milvus.yaml.
milvus.storage.passwordminioadminMot de passe du stockage Milvus. Il doit être identique à minio.secretAccessKey dans milvus.yaml.
milvus.storage.useSSLfalseUtilisation ou non de SSL pour le stockage Milvus. Il doit être identique à minio.useSSL dans milvus.yaml.

Format des données Milvus

Le connecteur Spark-Milvus prend en charge la lecture et l'écriture de données dans les formats de données Milvus suivants :

  • milvus: Le format de données Milvus pour une conversion transparente de Spark DataFrame en entités Milvus.
  • milvusbinlog: Le format de données Milvus pour la lecture des données binlog intégrées de Milvus.
  • mjson: Format JSON Milvus pour l'insertion en bloc de données dans Milvus.

milvus

Dans la section Démarrage rapide, nous utilisons le format milvus pour écrire des échantillons de données dans un cluster Milvus. Le format milvus est un nouveau format de données qui prend en charge l'écriture transparente de données Spark DataFrame dans Milvus Collections. Ceci est réalisé par des appels par lots à l'API Insert du SDK Milvus. Si une collection n'existe pas dans Milvus, une nouvelle collection sera créée sur la base du schéma du Dataframe. Toutefois, la collection créée automatiquement peut ne pas prendre en charge toutes les fonctionnalités du schéma de collection. Par conséquent, il est recommandé de créer d'abord une collection via le SDK, puis d'utiliser spark-milvus pour l'écriture. Pour plus d'informations, veuillez vous référer à la démo.

milvusbinlog

Le nouveau format de données milvusbinlog permet de lire les données binlog intégrées de Milvus. Binlog est le format de stockage de données interne de Milvus basé sur parquet. Malheureusement, il ne peut pas être lu par une bibliothèque parquet classique, c'est pourquoi nous avons implémenté ce nouveau format de données pour aider le travail Spark à le lire. Il n'est pas recommandé d'utiliser milvusbinlog directement à moins d'être familier avec les détails du stockage interne de milvus. Nous suggérons d'utiliser la fonction MilvusUtils qui sera présentée dans la section suivante.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus fournit la fonctionnalité Bulkinsert pour améliorer les performances d'écriture lors de l'utilisation de grands ensembles de données. Cependant, le format JSON utilisé par Milvus est légèrement différent du format de sortie JSON par défaut de Spark. Pour résoudre ce problème, nous introduisons le format de données mjson pour générer des données qui répondent aux exigences de Milvus. Voici un exemple qui montre la différence entre JSON-lines et mjson:

  • JSON-lignes :

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (requis pour Milvus Bulkinsert) :

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Ce point sera amélioré à l'avenir. Nous recommandons d'utiliser le format parquet dans l'intégration spark-milvus si votre version Milvus est v2.3.7+ qui supporte bulkinsert avec le format Parquet. Voir la démo sur Github.

MilvusUtils

MilvusUtils contient plusieurs fonctions util utiles. Actuellement, il n'est supporté qu'en Scala. Plus d'exemples d'utilisation se trouvent dans la section Utilisation avancée.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection est une interface simple pour charger une collection Milvus entière dans un Dataframe Spark. Elle englobe diverses opérations, notamment l'appel au SDK Milvus, la lecture du milvusbinlog et les opérations communes d'union/jointure.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark fournit un moyen pratique d'importer des fichiers de sortie Spark dans Milvus dans un grand lot. Il englobe l'API Bullkinsert du SDK Milvus.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Utilisation avancée

Dans cette section, vous trouverez des exemples d'utilisation avancée du connecteur Spark-Milvus pour l'analyse et la migration des données. Pour plus de démonstrations, voir exemples.

MySQL -> intégration -> Milvus

Dans cette démo, nous allons

  1. Lire les données de MySQL à travers le connecteur Spark-MySQL,
  2. Générer de l'embedding (en utilisant Word2Vec comme exemple), et
  3. écrire des données intégrées dans Milvus.

Pour activer le connecteur Spark-MySQL, vous devez ajouter la dépendance suivante à votre environnement Spark :

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transform -> Milvus

Dans cette démo, nous allons

  1. Lire les données d'une collection Milvus,
  2. Appliquer une transformation (en utilisant l'ACP comme exemple), et
  3. écrire les données transformées dans un autre Milvus via l'API Bulkinsert.

Le modèle PCA est un modèle de transformation qui réduit la dimensionnalité des vecteurs d'intégration, ce qui est une opération courante dans l'apprentissage automatique. Vous pouvez ajouter d'autres opérations de traitement, telles que le filtrage, la jonction ou la normalisation, à l'étape de transformation.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Si vous utilisez Zilliz Cloud (le service géré de Milvus), vous pouvez tirer parti de son API d'importation de données très pratique. Zilliz Cloud fournit des outils et une documentation complets pour vous aider à déplacer efficacement vos données à partir de diverses sources de données, y compris Spark et Databricks. Il vous suffit de configurer un bucket S3 en tant qu'intermédiaire et d'ouvrir son accès à votre compte Zilliz Cloud. L'API d'importation de données de Zilliz Cloud chargera automatiquement le lot complet de données depuis le bucket S3 vers votre cluster Zilliz Cloud.

Préparations

  1. Chargez le runtime Spark en ajoutant un fichier jar à votre cluster Databricks.

    Vous pouvez installer une bibliothèque de différentes manières. Cette capture d'écran montre le téléchargement d'un fichier jar du local vers le cluster. Pour plus d'informations, voir Cluster Libraries dans la documentation Databricks.

    Install Databricks Library Installer la bibliothèque Databricks

  2. Créez un bucket S3 et configurez-le comme emplacement de stockage externe pour votre cluster Databricks.

    Bulkinsert a exigé que les données soient stockées dans un seau temporaire afin que Zilliz Cloud puisse importer les données par lots. Vous pouvez créer un seau S3 et le configurer en tant qu'emplacement externe de Databricks. Veuillez vous référer à la section Emplacements externes pour plus de détails.

  3. Sécurisez vos identifiants Databricks.

    Pour plus de détails, consultez les instructions sur le blog Securely Managing Credentials in Databricks.

Démonstration

Voici un extrait de code illustrant le processus de migration de données par lots. Comme dans l'exemple Milvus ci-dessus, il suffit de remplacer l'identifiant et l'adresse du seau S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Travaux pratiques

Pour vous aider à démarrer rapidement avec le connecteur Spark-Milvus, nous avons préparé un carnet de notes qui vous guide à travers les processus de transfert de données en continu et par lots, avec Milvus et Zilliz Cloud.

Traduit parDeepLogo

Feedback

Cette page a-t - elle été utile ?