Guía del usuario del conector Spark-Milvus
El Spark-Milvus Connector (https://github.com/zilliztech/spark-milvus) proporciona una integración perfecta entre Apache Spark y Milvus, combinando las características de procesamiento de datos y ML de Apache Spark con las capacidades de almacenamiento y búsqueda de datos vectoriales de Milvus. Esta integración permite varias aplicaciones interesantes, entre ellas
- Cargar eficientemente datos vectoriales en Milvus en grandes lotes,
- Mover datos entre Milvus y otros sistemas de almacenamiento o bases de datos,
- Analizar los datos en Milvus aprovechando Spark MLlib y otras herramientas de IA.
Inicio rápido
Preparación
El conector Spark-Milvus es compatible con los lenguajes de programación Scala y Python. Los usuarios pueden utilizarlo con Pyspark o Spark-shell. Para ejecutar esta demo, configure un entorno Spark que contenga la dependencia Spark-Milvus Connector siguiendo los siguientes pasos:
Instalar Apache Spark (versión >= 3.3.0)
Puedes instalar Apache Spark consultando la documentación oficial.
Descargue el archivo jar spark-milvus.
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
Inicie el tiempo de ejecución de Spark con spark-milvus jar como una de las dependencias.
Para iniciar el tiempo de ejecución de Spark con el conector Spark-Milvus, añade el spark-milvus descargado como dependencia al comando
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
Demo
En esta demostración, creamos un Spark DataFrame de ejemplo con datos vectoriales y lo escribimos en Milvus a través del Conector Spark-Milvus. Se creará automáticamente una colección en Milvus basada en el esquema y las opciones especificadas.
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
Después de ejecutar el código anterior, puede ver los datos insertados en Milvus utilizando SDK o Attu (un tablero de Milvus). Puede encontrar una colección llamada hello_spark_milvus
creada con 4 entidades ya insertadas en ella.
Características y conceptos
Opciones de Milvus
En la sección de Inicio Rápido, hemos mostrado las opciones de configuración durante las operaciones con Milvus. Estas opciones se abstraen como Opciones de Milvus. Se utilizan para crear conexiones con Milvus y controlar otros comportamientos de Milvus. No todas las opciones son obligatorias.
Opción Clave | Valor por defecto | Descripción |
---|---|---|
milvus.host | localhost | Host del servidor Milvus. Vea Gestionar Conexiones Mil vus para más detalles. |
milvus.port | 19530 | Puerto del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información. |
milvus.username | root | Nombre de usuario del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información. |
milvus.password | Milvus | Contraseña del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información. |
milvus.uri | -- | URI del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información. |
milvus.token | -- | Token del servidor Milvus. Consulte Gestionar conexiones Milvus para obtener más información. |
milvus.database.name | default | Nombre de la base de datos Milvus a leer o escribir. |
milvus.collection.name | hello_milvus | Nombre de la colección Milvus para leer o escribir. |
milvus.collection.primaryKeyField | None | Nombre del campo de clave primaria de la colección. Obligatorio si la colección no existe. |
milvus.collection.vectorField | None | Nombre del campo vectorial de la colección. Obligatorio si la colección no existe. |
milvus.collection.vectorDim | None | Dimensión del campo vectorial de la colección. Obligatorio si la colección no existe. |
milvus.collection.autoID | false | Si la colección no existe, esta opción especifica si se deben generar automáticamente identificadores para las entidades. Para más información, véase create_collection |
milvus.bucket | a-bucket | Nombre del cubo en el almacenamiento Milvus. Debe ser el mismo que minio.bucketName en milvus.yaml. |
milvus.rootpath | files | Ruta raíz del almacenamiento Milvus. Debe ser la misma que minio.rootpath en milvus.yaml. |
milvus.fs | s3a:// | Sistema de archivos del almacenamiento Milvus. El valor s3a:// se aplica a Spark de código abierto. Utilice s3:// para Databricks. |
milvus.storage.endpoint | localhost:9000 | Endpoint del almacenamiento Milvus. Debe ser el mismo que minio.address :minio.port en milvus.yaml. |
milvus.storage.user | minioadmin | Usuario del almacenamiento Milvus. Debe ser el mismo que minio.accessKeyID en milvus.yaml. |
milvus.storage.password | minioadmin | Contraseña del almacenamiento Milvus. Debe ser la misma que minio.secretAccessKey en milvus.yaml. |
milvus.storage.useSSL | false | Si desea utilizar SSL para el almacenamiento Milvus. Debe ser el mismo que minio.useSSL en milvus.yaml. |
Formato de datos Milvus
El conector Spark-Milvus soporta la lectura y escritura de datos en los siguientes formatos de datos Milvus:
milvus
: Formato de datos Milvus para una conversión perfecta de Spark DataFrame a entidades Milvus.milvusbinlog
: Formato de datos Milvus para la lectura de datos binlog integrados en Milvus.mjson
: Formato Milvus JSON para la inserción masiva de datos en Milvus.
milvus
En Inicio rápido, utilizamos el formato milvus para escribir datos de muestra en un cluster Milvus. El formato milvus es un nuevo formato de datos que permite escribir sin problemas datos Spark DataFrame en Milvus Collections. Esto se consigue mediante llamadas por lotes a la API de inserción del SDK de Milvus. Si una colección no existe en Milvus, se creará una nueva colección basada en el esquema del Dataframe. Sin embargo, es posible que la colección creada automáticamente no admita todas las características del esquema de la colección. Por lo tanto, se recomienda crear primero una colección a través del SDK y luego utilizar spark-milvus para escribir. Para más información, consulte la demo.
milvusbinlog
El nuevo formato de datos milvusbinlog sirve para leer los datos binlog integrados en Milvus. Binlog es el formato de almacenamiento de datos interno de Milvus basado en parquet. Desafortunadamente, no puede ser leído por una biblioteca de parquet normal, por lo que hemos implementado este nuevo formato de datos para ayudar a Spark a leerlo. No se recomienda utilizar milvusbinlog directamente a menos que esté familiarizado con los detalles de almacenamiento interno de Milvus. Sugerimos utilizar la función MilvusUtils que se introducirá en la siguiente sección.
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus proporciona la funcionalidad Bulkinsert para un mejor rendimiento de escritura cuando se trabaja con grandes conjuntos de datos. Sin embargo, el formato JSON utilizado por Milvus es ligeramente diferente del formato de salida JSON por defecto de Spark. Para resolver esto, introducimos el formato de datos mjson para generar datos que cumplan los requisitos de Milvus. He aquí un ejemplo que muestra la diferencia entre JSON-lines y mjson:
JSON-lines:
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]} {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]} {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]} {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]} {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
mjson (Requerido para Milvus Bulkinsert):
{ "rows":[ {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}, {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}, {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}, {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}, {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} ] }
Esto se mejorará en el futuro. Recomendamos usar el formato parquet en la intergración spark-milvus si su versión de Milvus es v2.3.7+ que soporta bulkinsert con formato Parquet. Ver Demo en Github.
MilvusUtils
MilvusUtils contiene varias funciones útiles. Actualmente sólo está soportado en Scala. Más ejemplos de uso en la sección Uso Avanzado.
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection es una interfaz sencilla para cargar una colección Milvus completa en un marco de datos Spark. Envuelve varias operaciones, incluyendo la llamada a Milvus SDK, la lectura de milvusbinlog y operaciones comunes de unión/join.
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark proporciona una manera conveniente de importar archivos de salida Spark a Milvus en un lote grande. Envuelve la API Bullkinsert del SDK de Milvus.
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
Uso avanzado
En esta sección, encontrará ejemplos de uso avanzado del Conector Spark-Milvus para el análisis y la migración de datos. Para más demos, ver ejemplos.
MySQL -> incrustación -> Milvus
En esta demostración
- Leer datos de MySQL a través del Conector Spark-MySQL,
- Generar incrustaciones (utilizando Word2Vec como ejemplo), y
- Escribir datos incrustados en Milvus.
Para habilitar el Conector Spark-MySQL, necesita agregar la siguiente dependencia a su entorno Spark:
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> Transformar -> Milvus
En esta demostración, vamos a
- Leer datos de una colección Milvus,
- Aplicaremos una transformación (usando PCA como ejemplo), y
- Escribiremos los datos transformados en otro Milvus a través de la API Bulkinsert.
El modelo PCA es un modelo de transformación que reduce la dimensionalidad de los vectores incrustados, que es una operación común en el aprendizaje automático. Puede añadir cualquier otra operación de procesamiento, como filtrar, unir o normalizar, al paso de transformación.
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
Si utiliza Zilliz Cloud (el servicio Milvus gestionado), puede aprovechar su práctica API de importación de datos. Zilliz Cloud proporciona herramientas y documentación completas para ayudarte a mover eficientemente tus datos desde varias fuentes de datos, incluyendo Spark y Databricks. Solo tienes que configurar un bucket S3 como intermediario y abrir su acceso a tu cuenta de Zilliz Cloud. La API de importación de datos de Zilliz Cloud cargará automáticamente el lote completo de datos desde el bucket de S3 a tu clúster de Zilliz Cloud.
Preparativos
Carga el tiempo de ejecución de Spark añadiendo un archivo jar a tu Clúster Databricks.
Puedes instalar una librería de diferentes maneras. Esta captura de pantalla muestra la carga de un jar desde local al clúster. Para obtener más información, consulta Bibliotecas de clúster en la documentación de Databricks.
Instalar la biblioteca de Databricks
Crea un bucket S3 y configúralo como ubicación de almacenamiento externo para tu clúster Databricks.
Bulkinsert requiere que los datos se almacenen en un bucket temporal para que Zilliz Cloud pueda importar los datos en un lote. Puedes crear un bucket S3 y configurarlo como ubicación externa de databricks. Consulta Ubicaciones externas para más detalles.
Asegura tus credenciales de Databricks.
Para más detalles, consulta las instrucciones del blog Gestión segura de credenciales en Databricks.
Demo
Aquí tienes un fragmento de código que muestra el proceso de migración de datos por lotes. Similar al ejemplo anterior de Milvus, solo necesitas reemplazar la credencial y la dirección del bucket S3.
// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
Práctico
Para ayudarle a empezar rápidamente con el Conector Spark-Milvus, hemos preparado un cuaderno que le guía a través de los procesos de transferencia de datos en flujo y por lotes, con Milvus y Zilliz Cloud.