milvus-logo
LFAI
Home
  • Ferramentas

Guia do utilizador do conetor Spark-Milvus

O Conector Spark-Milvus (https://github.com/zilliztech/spark-milvus) fornece uma integração perfeita entre o Apache Spark e o Milvus, combinando o processamento de dados e os recursos de ML do Apache Spark com o armazenamento de dados vetoriais e os recursos de pesquisa do Milvus. Esta integração permite várias aplicações interessantes, incluindo:

  • Carregar eficientemente dados vetoriais no Milvus em grandes lotes,
  • Mover dados entre o Milvus e outros sistemas de armazenamento ou bases de dados,
  • Analisar os dados no Milvus tirando partido do Spark MLlib e de outras ferramentas de IA.

Início rápido

Preparação

O Conector Spark-Milvus suporta as linguagens de programação Scala e Python. Os utilizadores podem utilizá-lo com o Pyspark ou o Spark-shell. Para executar esta demonstração, configure um ambiente Spark contendo a dependência do Conector Spark-Milvus nas etapas a seguir:

  1. Instalar o Apache Spark (versão >= 3.3.0)

    Você pode instalar o Apache Spark consultando a documentação oficial.

  2. Baixe o arquivo jar spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Inicie o tempo de execução do Spark com o jar spark-milvus como uma das dependências.

    Para iniciar o tempo de execução do Spark com o Conector Spark-Milvus, adicione o spark-milvus baixado como dependência ao comando.

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Demonstração

Nesta demonstração, criamos um exemplo de Spark DataFrame com dados vetoriais e o escrevemos no Milvus através do Conector Spark-Milvus. Uma coleção será criada automaticamente no Milvus com base no esquema e nas opções especificadas.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

Depois de executar o código acima, pode ver os dados inseridos no Milvus utilizando o SDK ou o Attu (um painel de controlo do Milvus). Pode encontrar uma coleção chamada hello_spark_milvus criada com 4 entidades já inseridas.

Caraterísticas e conceitos

Opções do Milvus

Na secção Início rápido, mostrámos as opções de configuração durante as operações com o Milvus. Estas opções são abstraídas como Milvus Options. São utilizadas para criar ligações ao Milvus e controlar outros comportamentos do Milvus. Nem todas as opções são obrigatórias.

Opção ChaveValor por defeitoDescrição
milvus.hostlocalhostAnfitrião do servidor Milvus. Para mais informações, consulte Gerir ligações Milvus.
milvus.port19530Porta do servidor Milvus. Para mais pormenores, consulte Gerir ligações Milvus.
milvus.usernamerootNome de utilizador do servidor Milvus. Para mais informações, consulte Gerir ligações Milvus.
milvus.passwordMilvusPalavra-passe para o servidor Milvus. Para mais informações, consulte Gerir ligações Milvus.
milvus.uri--URI do servidor Milvus. Para mais informações, consulte Gerir ligações Milvus.
milvus.token--Token do servidor Milvus. Para mais pormenores, consulte Gerir ligações Milvus.
milvus.database.namedefaultNome da base de dados do Milvus a ler ou escrever.
milvus.collection.namehello_milvusNome da coleção Milvus a ler ou a escrever.
milvus.collection.primaryKeyFieldNoneNome do campo da chave primária na coleção. Obrigatório se a coleção não existir.
milvus.collection.vectorFieldNoneNome do campo vetorial da coleção. Obrigatório se a coleção não existir.
milvus.collection.vectorDimNoneDimensão do campo vetorial na coleção. Obrigatório se a coleção não existir.
milvus.collection.autoIDfalseSe a coleção não existir, esta opção especifica se deve gerar automaticamente IDs para as entidades. Para obter mais informações, consulte create_collection
milvus.bucketa-bucketNome do bucket no armazenamento Milvus. Deve ser o mesmo que minio.bucketName em milvus.yaml.
milvus.rootpathfilesCaminho da raiz do armazenamento Milvus. Deve ser o mesmo que minio.rootpath em milvus.yaml.
milvus.fss3a://Sistema de ficheiros do armazenamento Milvus. O valor s3a:// aplica-se ao Spark de código aberto. Utilize s3:// para Databricks.
milvus.storage.endpointlocalhost:9000Ponto final do armazenamento Milvus. Deve ser o mesmo que minio.address:minio.port em milvus.yaml.
milvus.storage.userminioadminUtilizador do armazenamento Milvus. Deve ser o mesmo que minio.accessKeyID em milvus.yaml.
milvus.storage.passwordminioadminPalavra-passe do armazenamento Milvus. Deve ser a mesma que minio.secretAccessKey em milvus.yaml.
milvus.storage.useSSLfalseSe deve utilizar SSL para o armazenamento Milvus. Deve ser o mesmo que minio.useSSL em milvus.yaml.

Formato de dados do Milvus

O conetor Spark-Milvus suporta a leitura e a gravação de dados nos seguintes formatos de dados do Milvus:

  • milvus: Formato de dados do Milvus para conversão contínua do Spark DataFrame para entidades do Milvus.
  • milvusbinlog: Formato de dados Milvus para leitura de dados binlog incorporados no Milvus.
  • mjson: Formato JSON do Milvus para inserir dados em massa no Milvus.

Milvus

No Quick start, usamos o formato milvus para escrever dados de amostra num cluster Milvus. O formato milvus é um novo formato de dados que suporta a gravação perfeita de dados Spark DataFrame em Milvus Collections. Isso é obtido por meio de chamadas em lote para a API Insert do SDK do Milvus. Se uma coleção não existir no Milvus, uma nova coleção será criada com base no esquema do Dataframe. No entanto, a coleção criada automaticamente pode não suportar todas as funcionalidades do esquema da coleção. Por conseguinte, recomenda-se que crie primeiro uma coleção através do SDK e, em seguida, utilize o spark-milvus para escrever. Para obter mais informações, consulte a demonstração.

milvusbinlog

O novo formato de dados milvusbinlog destina-se à leitura de dados binlog incorporados no Milvus. Binlog é o formato de armazenamento de dados interno do Milvus baseado em parquet. Infelizmente, ele não pode ser lido por uma biblioteca parquet comum, então implementamos este novo formato de dados para ajudar o Spark a lê-lo. Não é recomendado usar o milvusbinlog diretamente a menos que você esteja familiarizado com os detalhes do armazenamento interno do Milvus. Sugerimos a utilização da função MilvusUtils que será introduzida na próxima secção.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

O Milvus fornece a funcionalidade Bulkinsert para um melhor desempenho de escrita ao operar com grandes conjuntos de dados. No entanto, o formato JSON usado pelo Milvus é ligeiramente diferente do formato de saída JSON padrão do Spark. Para resolver isso, introduzimos o formato de dados mjson para gerar dados que atendam aos requisitos do Milvus. Aqui está um exemplo que mostra a diferença entre JSON-lines e mjson:

  • JSON-lines:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (Necessário para o Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Isto será melhorado no futuro. Recomendamos o uso do formato parquet na integração do spark-milvus se a sua versão do Milvus for v2.3.7+, que suporta bulkinsert com o formato Parquet. Veja a demonstração no Github.

MilvusUtils

MilvusUtils contém várias funções utilitárias úteis. Atualmente só é suportado em Scala. Mais exemplos de uso estão na secção Uso Avançado.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection é uma interface simples para carregar uma coleção inteira do Milvus em um Dataframe do Spark. Ela envolve várias operações, incluindo a chamada ao Milvus SDK, a leitura do milvusbinlog e operações comuns de união/join.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark fornece uma maneira conveniente de importar arquivos de saída Spark para o Milvus em um grande lote. Ele envolve a API Bullkinsert do SDK do Milvus.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Uso avançado

Nesta secção, encontrará exemplos de utilização avançada do Conector Spark-Milvus para análise e migração de dados. Para mais demonstrações, consulte exemplos.

MySQL -> incorporação -> Milvus

Nesta demonstração, iremos

  1. Ler dados do MySQL através do Conector Spark-MySQL,
  2. Gerar embedding (usando Word2Vec como exemplo), e
  3. Escrever dados incorporados no Milvus.

Para habilitar o Conector Spark-MySQL, é necessário adicionar a seguinte dependência ao seu ambiente Spark:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transformar -> Milvus

Nesta demonstração, nós iremos

  1. Ler dados de uma coleção Milvus,
  2. Aplicar uma transformação (usando PCA como exemplo), e
  3. Escrever os dados transformados em outro Milvus através da API Bulkinsert.

O modelo PCA é um modelo de transformação que reduz a dimensionalidade dos vectores de incorporação, o que é uma operação comum na aprendizagem automática. Pode adicionar quaisquer outras operações de processamento, como filtragem, junção ou normalização, ao passo de transformação.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Se estiver a utilizar o Zilliz Cloud (o serviço Milvus gerido), pode tirar partido da sua conveniente API de importação de dados. O Zilliz Cloud fornece ferramentas e documentação abrangentes para o ajudar a mover eficientemente os seus dados de várias fontes de dados, incluindo Spark e Databricks. Basta configurar um bucket S3 como intermediário e abrir o seu acesso à sua conta Zilliz Cloud. A API de importação de dados do Zilliz Cloud carregará automaticamente o lote completo de dados do bucket S3 para o seu cluster Zilliz Cloud.

Preparativos

  1. Carregue o tempo de execução do Spark adicionando um ficheiro jar ao seu Databricks Cluster.

    É possível instalar uma biblioteca de diferentes maneiras. Esta captura de tela mostra o upload de um jar do local para o cluster. Para obter mais informações, consulte Bibliotecas de cluster na documentação do Databricks.

    Install Databricks Library Instalar a biblioteca Databricks

  2. Crie um bucket S3 e configure-o como um local de armazenamento externo para o seu cluster Databricks.

    O Bulkinsert exigia que os dados fossem armazenados em um bucket temporário para que o Zilliz Cloud pudesse importar os dados em um lote. É possível criar um bucket S3 e configurá-lo como um local externo do Databricks. Consulte Locais externos para obter detalhes.

  3. Proteja suas credenciais do Databricks.

    Para obter mais detalhes, consulte as instruções no blogue Gerir credenciais de forma segura no Databricks.

Demonstração

Aqui está um trecho de código que mostra o processo de migração de dados em lote. Semelhante ao exemplo do Milvus acima, você só precisa substituir a credencial e o endereço do bucket S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Prática

Para o ajudar a começar rapidamente a utilizar o Conector Spark-Milvus, preparámos um bloco de notas que o orienta através dos processos de transferência de dados em fluxo contínuo e em lote, com o Milvus e o Zilliz Cloud.

Traduzido porDeepLogo

Feedback

Esta página foi útil?