milvus-logo
LFAI
Casa
  • Strumenti

Guida all'uso del connettore Spark-Milvus

Spark-Milvus Connector (https://github.com/zilliztech/spark-milvus) fornisce un'integrazione perfetta tra Apache Spark e Milvus, combinando l'elaborazione dei dati e le funzionalità di ML di Apache Spark con le capacità di archiviazione e ricerca dei dati vettoriali di Milvus. Questa integrazione consente di realizzare diverse applicazioni interessanti, tra cui:

  • Caricare in modo efficiente i dati vettoriali in Milvus in grandi lotti,
  • Spostare i dati tra Milvus e altri sistemi di archiviazione o database,
  • analizzare i dati in Milvus sfruttando Spark MLlib e altri strumenti di AI.

Avvio rapido

Preparazione

Il connettore Spark-Milvus supporta i linguaggi di programmazione Scala e Python. Gli utenti possono utilizzarlo con Pyspark o Spark-shell. Per eseguire questa demo, configurare un ambiente Spark contenente la dipendenza di Spark-Milvus Connector seguendo i seguenti passaggi:

  1. Installare Apache Spark (versione >= 3.3.0)

    Potete installare Apache Spark facendo riferimento alla documentazione ufficiale.

  2. Scaricare il file jar spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Avviare il runtime di Spark con il jar spark-milvus come una delle dipendenze.

    Per avviare il runtime di Spark con Spark-Milvus Connector, aggiungete spark-milvus scaricato come dipendenza al comando.

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Demo

In questa dimostrazione, creiamo un esempio di Spark DataFrame con dati vettoriali e lo scriviamo in Milvus attraverso lo Spark-Milvus Connector. Una collezione verrà creata automaticamente in Milvus in base allo schema e alle opzioni specificate.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

Dopo aver eseguito il codice di cui sopra, è possibile visualizzare i dati inseriti in Milvus utilizzando l'SDK o Attu (una dashboard di Milvus). È possibile trovare una raccolta denominata hello_spark_milvus creata con 4 entità già inserite.

Caratteristiche e concetti

Opzioni di Milvus

Nella sezione Avvio rapido, abbiamo mostrato l'impostazione delle opzioni durante le operazioni con Milvus. Queste opzioni sono astratte come Opzioni Milvus. Vengono utilizzate per creare connessioni a Milvus e controllare altri comportamenti di Milvus. Non tutte le opzioni sono obbligatorie.

Opzione ChiaveValore predefinitoDescrizione
milvus.hostlocalhostHost del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus.
milvus.port19530Porta del server Milvus. Vedere Gestione delle connessioni Milvus per maggiori dettagli.
milvus.usernamerootNome utente del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus.
milvus.passwordMilvusPassword per il server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus.
milvus.uri--URI del server Milvus. Vedere Gestione delle connessioni Milvus per maggiori dettagli.
milvus.token--Token del server Milvus. Per maggiori dettagli, vedere Gestione delle connessioni Milvus.
milvus.database.namedefaultNome del database Milvus da leggere o scrivere.
milvus.collection.namehello_milvusNome della collezione Milvus da leggere o scrivere.
milvus.collection.primaryKeyFieldNoneNome del campo chiave primaria della collezione. Richiesto se la collezione non esiste.
milvus.collection.vectorFieldNoneNome del campo vettore della collezione. Richiesto se l'insieme non esiste.
milvus.collection.vectorDimNoneDimensione del campo vettoriale della collezione. Richiesto se l'insieme non esiste.
milvus.collection.autoIDfalseSe l'insieme non esiste, questa opzione specifica se generare automaticamente gli ID per le entità. Per ulteriori informazioni, vedere create_collection.
milvus.bucketa-bucketNome del secchio nello storage Milvus. Dovrebbe essere lo stesso di minio.bucketName in milvus.yaml.
milvus.rootpathfilesPercorso principale del deposito Milvus. Deve essere uguale a minio.rootpath in milvus.yaml.
milvus.fss3a://File system del deposito Milvus. Il valore s3a:// si applica a Spark open-source. Utilizzare s3:// per Databricks.
milvus.storage.endpointlocalhost:9000Endpoint dello storage Milvus. Dovrebbe essere uguale a minio.address:minio.port in milvus.yaml.
milvus.storage.userminioadminUtente dello storage Milvus. Deve essere uguale a minio.accessKeyID in milvus.yaml.
milvus.storage.passwordminioadminPassword del deposito Milvus. Dovrebbe essere la stessa di minio.secretAccessKey in milvus.yaml.
milvus.storage.useSSLfalseSe utilizzare o meno l'SSL per il deposito Milvus. Deve essere uguale a minio.useSSL in milvus.yaml.

Formato dei dati Milvus

Il connettore Spark-Milvus supporta la lettura e la scrittura dei dati nei seguenti formati Milvus:

  • milvus: Formato dati Milvus per la conversione senza problemi da Spark DataFrame a entità Milvus.
  • milvusbinlog: Formato dati Milvus per la lettura dei dati binlog incorporati in Milvus.
  • mjson: Formato JSON di Milvus per l'inserimento di dati in massa in Milvus.

milvus

In Quick start, utilizziamo il formato milvus per scrivere dati di esempio in un cluster Milvus. Il formato milvus è un nuovo formato di dati che supporta la scrittura di dati Spark DataFrame senza soluzione di continuità nelle collezioni Milvus. Ciò si ottiene tramite chiamate batch all'API Insert dell'SDK Milvus. Se una raccolta non esiste in Milvus, verrà creata una nuova raccolta basata sullo schema del Dataframe. Tuttavia, la collezione creata automaticamente potrebbe non supportare tutte le caratteristiche dello schema della collezione. Pertanto, si consiglia di creare prima una raccolta tramite SDK e poi di utilizzare spark-milvus per la scrittura. Per ulteriori informazioni, consultare la demo.

milvusbinlog

Il nuovo formato di dati milvusbinlog serve a leggere i dati binlog integrati in Milvus. Binlog è il formato di archiviazione dati interno di Milvus basato sul parquet. Sfortunatamente, non può essere letto da una normale libreria di parquet, quindi abbiamo implementato questo nuovo formato di dati per aiutare Spark a leggerlo. Non è consigliabile utilizzare direttamente milvusbinlog a meno che non si abbia familiarità con i dettagli della memorizzazione interna di Milvus. Si consiglia di utilizzare la funzione MilvusUtils che verrà introdotta nella prossima sezione.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus fornisce la funzionalità Bulkinsert per migliorare le prestazioni di scrittura quando si opera con grandi insiemi di dati. Tuttavia, il formato JSON utilizzato da Milvus è leggermente diverso dal formato di output JSON predefinito di Spark. Per risolvere questo problema, abbiamo introdotto il formato di dati mjson per generare dati che soddisfino i requisiti di Milvus. Ecco un esempio che mostra la differenza tra JSON-lines e mjson:

  • Linee JSON:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (richiesto per Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Questo aspetto sarà migliorato in futuro. Si consiglia di utilizzare il formato parquet nell'integrazione spark-milvus se la versione di Milvus è la v2.3.7+ che supporta bulkinsert con il formato Parquet. Vedere la demo su Github.

MilvusUtils

MilvusUtils contiene diverse utili funzioni util. Attualmente è supportato solo in Scala. Altri esempi di utilizzo sono nella sezione Uso avanzato.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection è una semplice interfaccia per caricare un'intera collezione Milvus in un Dataframe Spark. Comprende varie operazioni, tra cui la chiamata all'SDK Milvus, la lettura di milvusbinlog e le comuni operazioni di unione/congiunzione.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark fornisce un modo pratico per importare i file di output di Spark in Milvus in un grande lotto. Si basa sull'API Bullkinsert dell'SDK Milvus.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Uso avanzato

In questa sezione troverete esempi di utilizzo avanzato del connettore Spark-Milvus per l'analisi e la migrazione dei dati. Per ulteriori dimostrazioni, vedere gli esempi.

MySQL -> incorporazione -> Milvus

In questa demo

  1. Leggere i dati da MySQL attraverso il connettore Spark-MySQL,
  2. generare embedding (utilizzando Word2Vec come esempio) e
  3. scrivere i dati incorporati in Milvus.

Per abilitare il connettore Spark-MySQL, è necessario aggiungere la seguente dipendenza all'ambiente Spark:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transform -> Milvus

In questa dimostrazione

  1. Leggere i dati da una raccolta Milvus,
  2. Applicheremo una trasformazione (usando PCA come esempio) e
  3. scrivere i dati trasformati in un altro Milvus tramite l'API Bulkinsert.

Il modello PCA è un modello di trasformazione che riduce la dimensionalità dei vettori incorporati, un'operazione comune nell'apprendimento automatico. È possibile aggiungere alla fase di trasformazione qualsiasi altra operazione di elaborazione, come filtrare, unire o normalizzare.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Se utilizzate Zilliz Cloud (il servizio gestito di Milvus), potete sfruttare la sua comoda API di importazione dei dati. Zilliz Cloud fornisce strumenti e documentazione completi per aiutarvi a spostare in modo efficiente i dati da varie fonti di dati, tra cui Spark e Databricks. È sufficiente impostare un bucket S3 come intermediario e aprirne l'accesso al vostro account Zilliz Cloud. L'API di importazione dei dati di Zilliz Cloud caricherà automaticamente l'intero batch di dati dal bucket S3 al vostro cluster Zilliz Cloud.

Preparazione

  1. Caricate il runtime Spark aggiungendo un file jar al vostro cluster Databricks.

    È possibile installare una libreria in diversi modi. Questa schermata mostra il caricamento di un jar da locale al cluster. Per ulteriori informazioni, vedere Librerie del cluster nella documentazione di Databricks.

    Install Databricks Library Installare la libreria Databricks

  2. Creare un bucket S3 e configurarlo come spazio di archiviazione esterno per il cluster Databricks.

    Bulkinsert richiede che i dati siano memorizzati in un bucket temporaneo, in modo che Zilliz Cloud possa importare i dati in un batch. È possibile creare un bucket S3 e configurarlo come posizione esterna di Databricks. Per maggiori dettagli, consultare la sezione Posizioni esterne.

  3. Proteggere le credenziali di Databricks.

    Per maggiori dettagli, consultare le istruzioni sul blog Gestione sicura delle credenziali in Databricks.

Demo

Ecco un frammento di codice che illustra il processo di migrazione dei dati in batch. Come nell'esempio di Milvus, è sufficiente sostituire le credenziali e l'indirizzo del bucket S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Istruzioni per l'uso

Per aiutarvi a iniziare rapidamente a utilizzare il connettore Spark-Milvus, abbiamo preparato un quaderno che vi guida attraverso i processi di trasferimento dei dati in streaming e in batch, con Milvus e Zilliz Cloud.

Tradotto daDeepLogo

Feedback

Questa pagina è stata utile?