Guida all'uso del connettore Spark-Milvus
Spark-Milvus Connector (https://github.com/zilliztech/spark-milvus) fornisce un'integrazione perfetta tra Apache Spark e Milvus, combinando l'elaborazione dei dati e le funzionalità di ML di Apache Spark con le capacità di archiviazione e ricerca dei dati vettoriali di Milvus. Questa integrazione consente di realizzare diverse applicazioni interessanti, tra cui:
- Caricare in modo efficiente i dati vettoriali in Milvus in grandi lotti,
- Spostare i dati tra Milvus e altri sistemi di archiviazione o database,
- analizzare i dati in Milvus sfruttando Spark MLlib e altri strumenti di AI.
Avvio rapido
Preparazione
Il connettore Spark-Milvus supporta i linguaggi di programmazione Scala e Python. Gli utenti possono utilizzarlo con Pyspark o Spark-shell. Per eseguire questa demo, configurare un ambiente Spark contenente la dipendenza di Spark-Milvus Connector seguendo i seguenti passaggi:
Installare Apache Spark (versione >= 3.3.0)
Potete installare Apache Spark facendo riferimento alla documentazione ufficiale.
Scaricare il file jar spark-milvus.
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
Avviare il runtime di Spark con il jar spark-milvus come una delle dipendenze.
Per avviare il runtime di Spark con Spark-Milvus Connector, aggiungete spark-milvus scaricato come dipendenza al comando.
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
Demo
In questa dimostrazione, creiamo un esempio di Spark DataFrame con dati vettoriali e lo scriviamo in Milvus attraverso lo Spark-Milvus Connector. Una collezione verrà creata automaticamente in Milvus in base allo schema e alle opzioni specificate.
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
Dopo aver eseguito il codice di cui sopra, è possibile visualizzare i dati inseriti in Milvus utilizzando l'SDK o Attu (una dashboard di Milvus). È possibile trovare una raccolta denominata hello_spark_milvus
creata con 4 entità già inserite.
Caratteristiche e concetti
Opzioni di Milvus
Nella sezione Avvio rapido, abbiamo mostrato l'impostazione delle opzioni durante le operazioni con Milvus. Queste opzioni sono astratte come Opzioni Milvus. Vengono utilizzate per creare connessioni a Milvus e controllare altri comportamenti di Milvus. Non tutte le opzioni sono obbligatorie.
Opzione Chiave | Valore predefinito | Descrizione |
---|---|---|
milvus.host | localhost | Host del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus. |
milvus.port | 19530 | Porta del server Milvus. Vedere Gestione delle connessioni Milvus per maggiori dettagli. |
milvus.username | root | Nome utente del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus. |
milvus.password | Milvus | Password per il server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus. |
milvus.uri | -- | URI del server Milvus. Vedere Gestione delle connessioni Milvus per maggiori dettagli. |
milvus.token | -- | Token del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus. |
milvus.database.name | default | Nome del database Milvus da leggere o scrivere. |
milvus.collection.name | hello_milvus | Nome della collezione Milvus da leggere o scrivere. |
milvus.collection.primaryKeyField | None | Nome del campo chiave primaria della collezione. Richiesto se la collezione non esiste. |
milvus.collection.vectorField | None | Nome del campo vettore della collezione. Richiesto se l'insieme non esiste. |
milvus.collection.vectorDim | None | Dimensione del campo vettoriale della collezione. Richiesto se l'insieme non esiste. |
milvus.collection.autoID | false | Se l'insieme non esiste, questa opzione specifica se generare automaticamente gli ID per le entità. Per ulteriori informazioni, vedere create_collection. |
milvus.bucket | a-bucket | Nome del secchio nello storage Milvus. Dovrebbe essere lo stesso di minio.bucketName in milvus.yaml. |
milvus.rootpath | files | Percorso principale del deposito Milvus. Deve essere uguale a minio.rootpath in milvus.yaml. |
milvus.fs | s3a:// | File system del deposito Milvus. Il valore s3a:// si applica a Spark open-source. Utilizzare s3:// per Databricks. |
milvus.storage.endpoint | localhost:9000 | Endpoint dello storage Milvus. Dovrebbe essere uguale a minio.address :minio.port in milvus.yaml. |
milvus.storage.user | minioadmin | Utente dello storage Milvus. Deve essere uguale a minio.accessKeyID in milvus.yaml. |
milvus.storage.password | minioadmin | Password del deposito Milvus. Dovrebbe essere la stessa di minio.secretAccessKey in milvus.yaml. |
milvus.storage.useSSL | false | Se utilizzare o meno l'SSL per il deposito Milvus. Deve essere uguale a minio.useSSL in milvus.yaml. |
Formato dei dati Milvus
Il connettore Spark-Milvus supporta la lettura e la scrittura dei dati nei seguenti formati Milvus:
milvus
: Formato dati Milvus per la conversione senza problemi da Spark DataFrame a entità Milvus.milvusbinlog
: Formato dati Milvus per la lettura dei dati binlog incorporati in Milvus.mjson
: Formato JSON di Milvus per l'inserimento di dati in massa in Milvus.
milvus
In Quick start, utilizziamo il formato milvus per scrivere dati di esempio in un cluster Milvus. Il formato milvus è un nuovo formato di dati che supporta la scrittura di dati Spark DataFrame senza soluzione di continuità nelle collezioni Milvus. Ciò si ottiene tramite chiamate batch all'API Insert dell'SDK Milvus. Se una raccolta non esiste in Milvus, verrà creata una nuova raccolta basata sullo schema del Dataframe. Tuttavia, la collezione creata automaticamente potrebbe non supportare tutte le caratteristiche dello schema della collezione. Pertanto, si consiglia di creare prima una raccolta tramite SDK e poi di utilizzare spark-milvus per la scrittura. Per ulteriori informazioni, consultare la demo.
milvusbinlog
Il nuovo formato di dati milvusbinlog serve a leggere i dati binlog integrati in Milvus. Binlog è il formato di archiviazione dati interno di Milvus basato sul parquet. Sfortunatamente, non può essere letto da una normale libreria di parquet, quindi abbiamo implementato questo nuovo formato di dati per aiutare Spark a leggerlo. Non è consigliabile utilizzare direttamente milvusbinlog a meno che non si abbia familiarità con i dettagli della memorizzazione interna di Milvus. Si consiglia di utilizzare la funzione MilvusUtils che verrà introdotta nella prossima sezione.
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus fornisce la funzionalità Bulkinsert per migliorare le prestazioni di scrittura quando si opera con grandi insiemi di dati. Tuttavia, il formato JSON utilizzato da Milvus è leggermente diverso dal formato di output JSON predefinito di Spark. Per risolvere questo problema, abbiamo introdotto il formato di dati mjson per generare dati che soddisfino i requisiti di Milvus. Ecco un esempio che mostra la differenza tra JSON-lines e mjson:
Linee JSON:
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]} {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]} {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]} {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]} {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
mjson (richiesto per Milvus Bulkinsert):
{ "rows":[ {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}, {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}, {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}, {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}, {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} ] }
Questo aspetto sarà migliorato in futuro. Si consiglia di utilizzare il formato parquet nell'integrazione spark-milvus se la versione di Milvus è la v2.3.7+ che supporta bulkinsert con il formato Parquet. Vedere la demo su Github.
MilvusUtils
MilvusUtils contiene diverse utili funzioni util. Attualmente è supportato solo in Scala. Altri esempi di utilizzo sono nella sezione Uso avanzato.
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection è una semplice interfaccia per caricare un'intera collezione Milvus in un Dataframe Spark. Comprende varie operazioni, tra cui la chiamata all'SDK Milvus, la lettura di milvusbinlog e le comuni operazioni di unione/congiunzione.
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark fornisce un modo pratico per importare i file di output di Spark in Milvus in un grande lotto. Si basa sull'API Bullkinsert dell'SDK Milvus.
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
Uso avanzato
In questa sezione troverete esempi di utilizzo avanzato del connettore Spark-Milvus per l'analisi e la migrazione dei dati. Per ulteriori dimostrazioni, vedere gli esempi.
MySQL -> incorporazione -> Milvus
In questa demo
- Leggere i dati da MySQL attraverso il connettore Spark-MySQL,
- generare embedding (utilizzando Word2Vec come esempio) e
- scrivere i dati incorporati in Milvus.
Per abilitare il connettore Spark-MySQL, è necessario aggiungere la seguente dipendenza all'ambiente Spark:
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> Transform -> Milvus
In questa dimostrazione
- Leggere i dati da una raccolta Milvus,
- Applicheremo una trasformazione (usando PCA come esempio) e
- scrivere i dati trasformati in un altro Milvus tramite l'API Bulkinsert.
Il modello PCA è un modello di trasformazione che riduce la dimensionalità dei vettori incorporati, un'operazione comune nell'apprendimento automatico. È possibile aggiungere alla fase di trasformazione qualsiasi altra operazione di elaborazione, come filtrare, unire o normalizzare.
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
Se utilizzate Zilliz Cloud (il servizio gestito di Milvus), potete sfruttare la sua comoda API di importazione dei dati. Zilliz Cloud fornisce strumenti e documentazione completi per aiutarvi a spostare in modo efficiente i dati da varie fonti di dati, tra cui Spark e Databricks. È sufficiente impostare un bucket S3 come intermediario e aprirne l'accesso al vostro account Zilliz Cloud. L'API di importazione dei dati di Zilliz Cloud caricherà automaticamente l'intero batch di dati dal bucket S3 al vostro cluster Zilliz Cloud.
Preparazione
Caricate il runtime Spark aggiungendo un file jar al vostro cluster Databricks.
È possibile installare una libreria in diversi modi. Questa schermata mostra il caricamento di un jar da locale al cluster. Per ulteriori informazioni, vedere Librerie del cluster nella documentazione di Databricks.
Installare la libreria Databricks
Creare un bucket S3 e configurarlo come spazio di archiviazione esterno per il cluster Databricks.
Bulkinsert richiede che i dati siano memorizzati in un bucket temporaneo, in modo che Zilliz Cloud possa importare i dati in un batch. È possibile creare un bucket S3 e configurarlo come posizione esterna di Databricks. Per maggiori dettagli, consultare la sezione Posizioni esterne.
Proteggere le credenziali di Databricks.
Per maggiori dettagli, consultare le istruzioni sul blog Gestione sicura delle credenziali in Databricks.
Demo
Ecco un frammento di codice che illustra il processo di migrazione dei dati in batch. Come nell'esempio di Milvus, è sufficiente sostituire le credenziali e l'indirizzo del bucket S3.
// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
Istruzioni per l'uso
Per aiutarvi a iniziare rapidamente a utilizzare il connettore Spark-Milvus, abbiamo preparato un quaderno che vi guida attraverso i processi di trasferimento dei dati in streaming e in batch, con Milvus e Zilliz Cloud.