milvus-logo
LFAI
Home
  • Herramientas

Guía del usuario del conector Spark-Milvus

El Spark-Milvus Connector (https://github.com/zilliztech/spark-milvus) proporciona una integración perfecta entre Apache Spark y Milvus, combinando las características de procesamiento de datos y ML de Apache Spark con las capacidades de almacenamiento y búsqueda de datos vectoriales de Milvus. Esta integración permite varias aplicaciones interesantes, entre ellas

  • Cargar eficientemente datos vectoriales en Milvus en grandes lotes,
  • Mover datos entre Milvus y otros sistemas de almacenamiento o bases de datos,
  • Analizar los datos en Milvus aprovechando Spark MLlib y otras herramientas de IA.

Inicio rápido

Preparación

El conector Spark-Milvus es compatible con los lenguajes de programación Scala y Python. Los usuarios pueden utilizarlo con Pyspark o Spark-shell. Para ejecutar esta demo, configure un entorno Spark que contenga la dependencia Spark-Milvus Connector siguiendo los siguientes pasos:

  1. Instalar Apache Spark (versión >= 3.3.0)

    Puedes instalar Apache Spark consultando la documentación oficial.

  2. Descargue el archivo jar spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Inicie el tiempo de ejecución de Spark con spark-milvus jar como una de las dependencias.

    Para iniciar el tiempo de ejecución de Spark con el conector Spark-Milvus, añade el spark-milvus descargado como dependencia al comando

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Demo

En esta demostración, creamos un Spark DataFrame de ejemplo con datos vectoriales y lo escribimos en Milvus a través del Conector Spark-Milvus. Se creará automáticamente una colección en Milvus basada en el esquema y las opciones especificadas.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

Después de ejecutar el código anterior, puede ver los datos insertados en Milvus utilizando SDK o Attu (un tablero de Milvus). Puede encontrar una colección llamada hello_spark_milvus creada con 4 entidades ya insertadas en ella.

Características y conceptos

Opciones de Milvus

En la sección de Inicio Rápido, hemos mostrado las opciones de configuración durante las operaciones con Milvus. Estas opciones se abstraen como Opciones de Milvus. Se utilizan para crear conexiones con Milvus y controlar otros comportamientos de Milvus. No todas las opciones son obligatorias.

Opción ClaveValor por defectoDescripción
milvus.hostlocalhostHost del servidor Milvus. Vea Gestionar Conexiones Milvus para más detalles.
milvus.port19530Puerto del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información.
milvus.usernamerootNombre de usuario del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información.
milvus.passwordMilvusContraseña del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información.
milvus.uri--URI del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información.
milvus.token--Token del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información.
milvus.database.namedefaultNombre de la base de datos Milvus a leer o escribir.
milvus.collection.namehello_milvusNombre de la colección Milvus para leer o escribir.
milvus.collection.primaryKeyFieldNoneNombre del campo de clave primaria de la colección. Obligatorio si la colección no existe.
milvus.collection.vectorFieldNoneNombre del campo vectorial de la colección. Obligatorio si la colección no existe.
milvus.collection.vectorDimNoneDimensión del campo vectorial de la colección. Obligatorio si la colección no existe.
milvus.collection.autoIDfalseSi la colección no existe, esta opción especifica si se deben generar automáticamente identificadores para las entidades. Para más información, véase create_collection
milvus.bucketa-bucketNombre del cubo en el almacenamiento Milvus. Debe ser el mismo que minio.bucketName en milvus.yaml.
milvus.rootpathfilesRuta raíz del almacenamiento Milvus. Debe ser la misma que minio.rootpath en milvus.yaml.
milvus.fss3a://Sistema de archivos del almacenamiento Milvus. El valor s3a:// se aplica a Spark de código abierto. Utilice s3:// para Databricks.
milvus.storage.endpointlocalhost:9000Endpoint del almacenamiento Milvus. Debe ser el mismo que minio.address:minio.port en milvus.yaml.
milvus.storage.userminioadminUsuario del almacenamiento Milvus. Debe ser el mismo que minio.accessKeyID en milvus.yaml.
milvus.storage.passwordminioadminContraseña del almacenamiento Milvus. Debe ser la misma que minio.secretAccessKey en milvus.yaml.
milvus.storage.useSSLfalseSi desea utilizar SSL para el almacenamiento Milvus. Debe ser el mismo que minio.useSSL en milvus.yaml.

Formato de datos Milvus

El conector Spark-Milvus soporta la lectura y escritura de datos en los siguientes formatos de datos Milvus:

  • milvus: Formato de datos Milvus para una conversión perfecta de Spark DataFrame a entidades Milvus.
  • milvusbinlog: Formato de datos Milvus para la lectura de datos binlog integrados en Milvus.
  • mjson: Formato Milvus JSON para la inserción masiva de datos en Milvus.

milvus

En Inicio rápido, utilizamos el formato milvus para escribir datos de muestra en un cluster Milvus. El formato milvus es un nuevo formato de datos que permite escribir sin problemas datos Spark DataFrame en Milvus Collections. Esto se consigue mediante llamadas por lotes a la API de inserción del SDK de Milvus. Si una colección no existe en Milvus, se creará una nueva colección basada en el esquema del Dataframe. Sin embargo, es posible que la colección creada automáticamente no admita todas las características del esquema de la colección. Por lo tanto, se recomienda crear primero una colección a través del SDK y luego utilizar spark-milvus para escribir. Para más información, consulte la demo.

milvusbinlog

El nuevo formato de datos milvusbinlog sirve para leer los datos binlog integrados en Milvus. Binlog es el formato de almacenamiento de datos interno de Milvus basado en parquet. Desafortunadamente, no puede ser leído por una biblioteca de parquet normal, por lo que hemos implementado este nuevo formato de datos para ayudar a Spark a leerlo. No se recomienda utilizar milvusbinlog directamente a menos que esté familiarizado con los detalles de almacenamiento interno de Milvus. Sugerimos utilizar la función MilvusUtils que se introducirá en la siguiente sección.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus proporciona la funcionalidad Bulkinsert para un mejor rendimiento de escritura cuando se trabaja con grandes conjuntos de datos. Sin embargo, el formato JSON utilizado por Milvus es ligeramente diferente del formato de salida JSON por defecto de Spark. Para resolver esto, introducimos el formato de datos mjson para generar datos que cumplan los requisitos de Milvus. He aquí un ejemplo que muestra la diferencia entre JSON-lines y mjson:

  • JSON-lines:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (Requerido para Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Esto se mejorará en el futuro. Recomendamos usar el formato parquet en la intergración spark-milvus si su versión de Milvus es v2.3.7+ que soporta bulkinsert con formato Parquet. Ver Demo en Github.

MilvusUtils

MilvusUtils contiene varias funciones útiles. Actualmente sólo está soportado en Scala. Más ejemplos de uso en la sección Uso Avanzado.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection es una interfaz sencilla para cargar una colección Milvus completa en un marco de datos Spark. Envuelve varias operaciones, incluyendo la llamada a Milvus SDK, la lectura de milvusbinlog y operaciones comunes de unión/join.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark proporciona una manera conveniente de importar archivos de salida Spark a Milvus en un lote grande. Envuelve la API Bullkinsert del SDK de Milvus.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Uso avanzado

En esta sección, encontrará ejemplos de uso avanzado del Conector Spark-Milvus para el análisis y la migración de datos. Para más demos, ver ejemplos.

MySQL -> incrustación -> Milvus

En esta demostración

  1. Leer datos de MySQL a través del Conector Spark-MySQL,
  2. Generar incrustaciones (utilizando Word2Vec como ejemplo), y
  3. Escribir datos incrustados en Milvus.

Para habilitar el Conector Spark-MySQL, necesita agregar la siguiente dependencia a su entorno Spark:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transformar -> Milvus

En esta demostración, vamos a

  1. Leer datos de una colección Milvus,
  2. Aplicar una transformación (utilizando PCA como ejemplo), y
  3. Escribiremos los datos transformados en otro Milvus a través de la API Bulkinsert.

El modelo PCA es un modelo de transformación que reduce la dimensionalidad de los vectores incrustados, que es una operación común en el aprendizaje automático. Puede añadir cualquier otra operación de procesamiento, como filtrar, unir o normalizar, al paso de transformación.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Si utiliza Zilliz Cloud (el servicio Milvus gestionado), puede aprovechar su práctica API de importación de datos. Zilliz Cloud proporciona herramientas y documentación completas para ayudarte a mover eficientemente tus datos desde varias fuentes de datos, incluyendo Spark y Databricks. Solo tienes que configurar un bucket S3 como intermediario y abrir su acceso a tu cuenta de Zilliz Cloud. La API de importación de datos de Zilliz Cloud cargará automáticamente el lote completo de datos desde el bucket de S3 a tu clúster de Zilliz Cloud.

Preparativos

  1. Carga el tiempo de ejecución de Spark añadiendo un archivo jar a tu Clúster Databricks.

    Puedes instalar una librería de diferentes maneras. Esta captura de pantalla muestra la carga de un jar desde local al clúster. Para obtener más información, consulta Bibliotecas de clúster en la documentación de Databricks.

    Install Databricks Library Instalar la biblioteca de Databricks

  2. Crea un bucket S3 y configúralo como ubicación de almacenamiento externo para tu clúster Databricks.

    Bulkinsert requiere que los datos se almacenen en un bucket temporal para que Zilliz Cloud pueda importar los datos en un lote. Puedes crear un bucket S3 y configurarlo como ubicación externa de databricks. Consulta Ubicaciones externas para más detalles.

  3. Asegura tus credenciales de Databricks.

    Para más detalles, consulta las instrucciones del blog Gestión segura de credenciales en Databricks.

Demo

Aquí tienes un fragmento de código que muestra el proceso de migración de datos por lotes. Similar al ejemplo anterior de Milvus, solo necesitas reemplazar la credencial y la dirección del bucket S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Práctico

Para ayudarle a empezar rápidamente con el Conector Spark-Milvus, hemos preparado un cuaderno que le guía a través de los procesos de transferencia de datos en flujo y por lotes, con Milvus y Zilliz Cloud.

Traducido porDeepLogo

Feedback

¿Fue útil esta página?