🚀 Попробуйте Zilliz Cloud, полностью управляемый Milvus, бесплатно — ощутите 10-кратное увеличение производительности! Попробовать сейчас>

milvus-logo
LFAI
Главная
  • Инструменты
  • Home
  • Docs
  • Инструменты

  • Разъемы Milvus

  • Искра

Используйте Apache Spark™ с Milvus/Zilliz Cloud для конвейеров искусственного интеллекта

Коннектор Spark-Milvus Connector обеспечивает интеграцию Apache Spark и Databricks с Milvus и Zilliz Cloud. Он соединяет мощные функции Apache Spark по обработке больших данных и машинному обучению (ML) с современными возможностями векторного поиска Milvus. Эта интеграция позволяет оптимизировать рабочий процесс для поиска на основе искусственного интеллекта, расширенной аналитики, обучения ML и эффективного управления крупными векторными данными.

Apache Spark - это платформа распределенной обработки данных, предназначенная для работы с огромными массивами данных с высокой скоростью вычислений. В паре с Milvus или Zilliz Cloud она открывает новые возможности для таких сценариев использования, как семантический поиск, рекомендательные системы и аналитика данных на основе искусственного интеллекта.

Например, Spark может пакетно обрабатывать большие массивы данных для создания вкраплений с помощью ML-моделей, а затем использовать коннектор Spark-Milvus для хранения этих вкраплений непосредственно в Milvus или Zilliz Cloud. После индексации эти данные можно быстро искать или анализировать, создавая мощный конвейер для рабочих процессов ИИ и больших данных.

Коннектор Spark-Milvus поддерживает такие задачи, как итеративный и массовый ввод данных в Milvus, синхронизация данных между системами и расширенная аналитика векторных данных, хранящихся в Milvus. В этом руководстве вы узнаете, как настроить и эффективно использовать коннектор для таких задач, как:

  • Эффективная загрузка векторных данных в Milvus большими партиями,
  • перемещение данных между Milvus и другими системами хранения или базами данных,
  • Анализ данных в Milvus с помощью Spark MLlib и других инструментов искусственного интеллекта.

Быстрый старт

Подготовка

Spark-Milvus Connector поддерживает языки программирования Scala и Python. Пользователи могут использовать его с Pyspark или Spark-shell. Чтобы запустить эту демонстрацию, настройте среду Spark, содержащую зависимость Spark-Milvus Connector, выполнив следующие действия:

  1. Установите Apache Spark (версия >= 3.3.0).

    Вы можете установить Apache Spark, обратившись к официальной документации.

  2. Загрузите jar-файл spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Запустите среду выполнения Spark с spark-milvus jar в качестве одной из зависимостей.

    Чтобы запустить среду выполнения Spark с коннектором Spark-Milvus Connector, добавьте в команду загруженный spark-milvus в качестве зависимости.

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Демонстрация

В этой демонстрации мы создадим пример Spark DataFrame с векторными данными и запишем его в Milvus через Spark-Milvus Connector. Коллекция будет создана в Milvus автоматически на основе схемы и заданных параметров.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

После выполнения приведенного выше кода вы можете просмотреть вставленные данные в Milvus с помощью SDK или Attu (A Milvus Dashboard). Вы можете найти коллекцию с именем hello_spark_milvus, в которую уже вставлены 4 сущности.

Особенности и концепции

Опции Milvus

В разделе "Быстрый старт " мы показали установку опций при работе с Milvus. Эти опции абстрагированы как опции Milvus. Они используются для создания соединений с Milvus и управления другим поведением Milvus. Не все опции являются обязательными.

Ключ опцииЗначение по умолчаниюОписание
milvus.hostlocalhostХост сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.port19530Порт сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.usernamerootИмя пользователя для сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.passwordMilvusПароль для сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.uri--URI сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.token--Токен сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus.
milvus.database.namedefaultИмя базы данных Milvus для чтения или записи.
milvus.collection.namehello_milvusИмя коллекции Milvus для чтения или записи.
milvus.collection.primaryKeyFieldNoneИмя поля первичного ключа в коллекции. Требуется, если коллекция не существует.
milvus.collection.vectorFieldNoneИмя векторного поля в коллекции. Требуется, если коллекция не существует.
milvus.collection.vectorDimNoneРазмерность векторного поля в коллекции. Требуется, если коллекция не существует.
milvus.collection.autoIDfalseЕсли коллекция не существует, этот параметр указывает, нужно ли автоматически генерировать идентификаторы для сущностей. Дополнительные сведения см. в разделе create_collection
milvus.bucketa-bucketИмя ведра в хранилище Milvus. Оно должно быть таким же, как minio.bucketName в milvus.yaml.
milvus.rootpathfilesКорневой путь хранилища Milvus. Он должен быть таким же, как minio.rootpath в milvus.yaml.
milvus.fss3a://Файловая система хранилища Milvus. Значение s3a:// применяется для Spark с открытым исходным кодом. Для Databricks используйте s3://.
milvus.storage.endpointlocalhost:9000Конечная точка хранилища Milvus. Это значение должно быть таким же, как minio.address:minio.port в milvus.yaml.
milvus.storage.userminioadminПользователь хранилища Milvus. Это должно быть так же, как minio.accessKeyID в milvus.yaml.
milvus.storage.passwordminioadminПароль хранилища Milvus. Пароль должен быть таким же, как minio.secretAccessKey в milvus.yaml.
milvus.storage.useSSLfalseИспользовать ли SSL для хранилища Milvus. Значение должно быть таким же, как minio.useSSL в milvus.yaml.

Формат данных Milvus

Spark-Milvus Connector поддерживает чтение и запись данных в следующих форматах данных Milvus:

  • milvus: Формат данных Milvus для плавного преобразования из Spark DataFrame в сущности Milvus.
  • milvusbinlog: Формат данных Milvus для чтения встроенных данных бинлога Milvus.
  • mjson: Формат Milvus JSON для массового ввода данных в Milvus.

milvus

В разделе "Быстрый старт" мы используем формат milvus для записи образцов данных в кластер Milvus. Формат milvus - это новый формат данных, который поддерживает беспрепятственную запись данных Spark DataFrame в коллекции Milvus. Это достигается с помощью пакетных вызовов Insert API в Milvus SDK. Если коллекция не существует в Milvus, новая коллекция будет создана на основе схемы Dataframe. Однако автоматически созданная коллекция может не поддерживать все возможности схемы коллекции. Поэтому рекомендуется сначала создать коллекцию через SDK, а затем использовать spark-milvus для записи. Для получения дополнительной информации обратитесь к демонстрации.

milvusbinlog

Новый формат данных milvusbinlog предназначен для чтения встроенных в Milvus данных binlog. Binlog - это внутренний формат хранения данных Milvus, основанный на parquet. К сожалению, он не может быть прочитан обычной библиотекой parquet, поэтому мы реализовали этот новый формат данных, чтобы помочь Spark job прочитать его. Не рекомендуется использовать milvusbinlog напрямую, если вы не знакомы с деталями внутреннего хранилища milvus. Мы предлагаем использовать функцию MilvusUtils, которая будет представлена в следующем разделе.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus предоставляет функциональность Bulkinsert для повышения производительности записи при работе с большими наборами данных. Однако формат JSON, используемый Milvus, несколько отличается от формата JSON, используемого по умолчанию в Spark. Чтобы решить эту проблему, мы ввели формат данных mjson для генерации данных, соответствующих требованиям Milvus. Вот пример, показывающий разницу между JSON-lines и mjson:

  • JSON-lines:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (требуется для Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Это будет улучшено в будущем. Мы рекомендуем использовать формат parquet в интеграции spark-milvus, если ваша версия Milvus - v2.3.7+, которая поддерживает bulkinsert с форматом Parquet. Смотрите демонстрацию на Github.

MilvusUtils

MilvusUtils содержит несколько полезных функций util. В настоящее время он поддерживается только в Scala. Примеры использования приведены в разделе Advanced Usage.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection - это простой интерфейс для загрузки всей коллекции Milvus в Spark Dataframe. Он включает в себя различные операции, в том числе вызов Milvus SDK, чтение milvusbinlog и общие операции объединения/соединения.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark предоставляет удобный способ импортировать выходные файлы Spark в Milvus большой партией. Он оборачивает API Bullkinsert из Milvus SDK.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Расширенное использование

В этом разделе вы найдете примеры расширенного использования Spark-Milvus Connector для анализа и миграции данных. Дополнительные демонстрации смотрите в примерах.

MySQL -> встраивание -> Milvus

В этой демонстрации мы

  1. Прочитаем данные из MySQL через Spark-MySQL Connector,
  2. генерировать встраивание (на примере Word2Vec) и
  3. записывать встроенные данные в Milvus.

Чтобы включить Spark-MySQL Connector, вам нужно добавить следующую зависимость в ваше окружение Spark:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transform -> Milvus

В этом демонстрационном примере мы

  1. Считывать данные из коллекции Milvus,
  2. Применим трансформацию (на примере PCA) и
  3. запишем преобразованные данные в другой Milvus с помощью Bulkinsert API.

Модель PCA - это модель трансформации, которая уменьшает размерность векторов вложения, что является распространенной операцией в машинном обучении. Вы можете добавить любые другие операции обработки, такие как фильтрация, объединение или нормализация, к шагу трансформации.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Если вы используете Zilliz Cloud (управляемый сервис Milvus), вы можете воспользоваться его удобным API импорта данных. Zilliz Cloud предоставляет исчерпывающие инструменты и документацию, чтобы помочь вам эффективно перемещать данные из различных источников данных, включая Spark и Databricks. Просто настройте ведро S3 в качестве посредника и откройте к нему доступ в своей учетной записи Zilliz Cloud. API импорта данных Zilliz Cloud автоматически загрузит всю партию данных из ведра S3 в ваш кластер Zilliz Cloud.

Подготовительные работы

  1. Загрузите среду выполнения Spark, добавив jar-файл в кластер Databricks.

    Библиотеку можно установить разными способами. На этом скриншоте показана загрузка jar-файла из локальной сети в кластер. Дополнительные сведения см. в разделе Кластерные библиотеки в документации Databricks.

    Install Databricks Library Установка библиотеки Databricks

  2. Создайте ведро S3 и настройте его как внешнее хранилище для кластера Databricks.

    Для Bulkinsert требуется хранить данные во временном ведре, чтобы Zilliz Cloud мог импортировать их в пакетном режиме. Вы можете создать ведро S3 и настроить его как внешнее хранилище Databricks. Подробности см. в разделе Внешние местоположения.

  3. Защитите учетные данные Databricks.

    Для получения более подробной информации см. инструкции в блоге Securely Managing Credentials in Databricks.

Демонстрация

Вот фрагмент кода, демонстрирующий процесс пакетной миграции данных. Аналогично приведенному выше примеру Milvus, вам просто нужно заменить учетные данные и адрес ведра S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Практический блокнот

Чтобы быстро освоить коннектор Spark-Milvus Connector, вы можете просмотреть блокнот, в котором рассмотрены примеры потокового и пакетного переноса данных из Spark в Milvus и Zilliz Cloud.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?