Используйте Apache Spark™ с Milvus/Zilliz Cloud для конвейеров искусственного интеллекта
Коннектор Spark-Milvus Connector обеспечивает интеграцию Apache Spark и Databricks с Milvus и Zilliz Cloud. Он соединяет мощные функции Apache Spark по обработке больших данных и машинному обучению (ML) с современными возможностями векторного поиска Milvus. Эта интеграция позволяет оптимизировать рабочий процесс для поиска на основе искусственного интеллекта, расширенной аналитики, обучения ML и эффективного управления крупными векторными данными.
Apache Spark - это платформа распределенной обработки данных, предназначенная для работы с огромными массивами данных с высокой скоростью вычислений. В паре с Milvus или Zilliz Cloud она открывает новые возможности для таких сценариев использования, как семантический поиск, рекомендательные системы и аналитика данных на основе искусственного интеллекта.
Например, Spark может пакетно обрабатывать большие массивы данных для создания вкраплений с помощью ML-моделей, а затем использовать коннектор Spark-Milvus для хранения этих вкраплений непосредственно в Milvus или Zilliz Cloud. После индексации эти данные можно быстро искать или анализировать, создавая мощный конвейер для рабочих процессов ИИ и больших данных.
Коннектор Spark-Milvus поддерживает такие задачи, как итеративный и массовый ввод данных в Milvus, синхронизация данных между системами и расширенная аналитика векторных данных, хранящихся в Milvus. В этом руководстве вы узнаете, как настроить и эффективно использовать коннектор для таких задач, как:
- Эффективная загрузка векторных данных в Milvus большими партиями,
- перемещение данных между Milvus и другими системами хранения или базами данных,
- Анализ данных в Milvus с помощью Spark MLlib и других инструментов искусственного интеллекта.
Быстрый старт
Подготовка
Spark-Milvus Connector поддерживает языки программирования Scala и Python. Пользователи могут использовать его с Pyspark или Spark-shell. Чтобы запустить эту демонстрацию, настройте среду Spark, содержащую зависимость Spark-Milvus Connector, выполнив следующие действия:
Установите Apache Spark (версия >= 3.3.0).
Вы можете установить Apache Spark, обратившись к официальной документации.
Загрузите jar-файл spark-milvus.
wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
Запустите среду выполнения Spark с spark-milvus jar в качестве одной из зависимостей.
Чтобы запустить среду выполнения Spark с коннектором Spark-Milvus Connector, добавьте в команду загруженный spark-milvus в качестве зависимости.
pyspark
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
spark-shell
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
Демонстрация
В этой демонстрации мы создадим пример Spark DataFrame с векторными данными и запишем его в Milvus через Spark-Milvus Connector. Коллекция будет создана в Milvus автоматически на основе схемы и заданных параметров.
from pyspark.sql import SparkSession
columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
(4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
.mode("append") \
.option("milvus.host", "localhost") \
.option("milvus.port", "19530") \
.option("milvus.collection.name", "hello_spark_milvus") \
.option("milvus.collection.vectorField", "vec") \
.option("milvus.collection.vectorDim", "8") \
.option("milvus.collection.primaryKeyField", "id") \
.format("milvus") \
.save()
import org.apache.spark.sql.{SaveMode, SparkSession}
object Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// set milvus options
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri,
"milvus.port" -> "19530",
"milvus.collection.name" -> "hello_spark_milvus",
"milvus.collection.vectorField" -> "vec",
"milvus.collection.vectorDim" -> "5",
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
После выполнения приведенного выше кода вы можете просмотреть вставленные данные в Milvus с помощью SDK или Attu (A Milvus Dashboard). Вы можете найти коллекцию с именем hello_spark_milvus
, в которую уже вставлены 4 сущности.
Особенности и концепции
Опции Milvus
В разделе "Быстрый старт " мы показали установку опций при работе с Milvus. Эти опции абстрагированы как опции Milvus. Они используются для создания соединений с Milvus и управления другим поведением Milvus. Не все опции являются обязательными.
Ключ опции | Значение по умолчанию | Описание |
---|---|---|
milvus.host | localhost | Хост сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.port | 19530 | Порт сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.username | root | Имя пользователя для сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.password | Milvus | Пароль для сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.uri | -- | URI сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.token | -- | Токен сервера Milvus. Подробнее см. в разделе Управление подключениями Milvus. |
milvus.database.name | default | Имя базы данных Milvus для чтения или записи. |
milvus.collection.name | hello_milvus | Имя коллекции Milvus для чтения или записи. |
milvus.collection.primaryKeyField | None | Имя поля первичного ключа в коллекции. Требуется, если коллекция не существует. |
milvus.collection.vectorField | None | Имя векторного поля в коллекции. Требуется, если коллекция не существует. |
milvus.collection.vectorDim | None | Размерность векторного поля в коллекции. Требуется, если коллекция не существует. |
milvus.collection.autoID | false | Если коллекция не существует, этот параметр указывает, нужно ли автоматически генерировать идентификаторы для сущностей. Дополнительные сведения см. в разделе create_collection |
milvus.bucket | a-bucket | Имя ведра в хранилище Milvus. Оно должно быть таким же, как minio.bucketName в milvus.yaml. |
milvus.rootpath | files | Корневой путь хранилища Milvus. Он должен быть таким же, как minio.rootpath в milvus.yaml. |
milvus.fs | s3a:// | Файловая система хранилища Milvus. Значение s3a:// применяется для Spark с открытым исходным кодом. Для Databricks используйте s3:// . |
milvus.storage.endpoint | localhost:9000 | Конечная точка хранилища Milvus. Это значение должно быть таким же, как minio.address :minio.port в milvus.yaml. |
milvus.storage.user | minioadmin | Пользователь хранилища Milvus. Это должно быть так же, как minio.accessKeyID в milvus.yaml. |
milvus.storage.password | minioadmin | Пароль хранилища Milvus. Пароль должен быть таким же, как minio.secretAccessKey в milvus.yaml. |
milvus.storage.useSSL | false | Использовать ли SSL для хранилища Milvus. Значение должно быть таким же, как minio.useSSL в milvus.yaml. |
Формат данных Milvus
Spark-Milvus Connector поддерживает чтение и запись данных в следующих форматах данных Milvus:
milvus
: Формат данных Milvus для плавного преобразования из Spark DataFrame в сущности Milvus.milvusbinlog
: Формат данных Milvus для чтения встроенных данных бинлога Milvus.mjson
: Формат Milvus JSON для массового ввода данных в Milvus.
milvus
В разделе "Быстрый старт" мы используем формат milvus для записи образцов данных в кластер Milvus. Формат milvus - это новый формат данных, который поддерживает беспрепятственную запись данных Spark DataFrame в коллекции Milvus. Это достигается с помощью пакетных вызовов Insert API в Milvus SDK. Если коллекция не существует в Milvus, новая коллекция будет создана на основе схемы Dataframe. Однако автоматически созданная коллекция может не поддерживать все возможности схемы коллекции. Поэтому рекомендуется сначала создать коллекцию через SDK, а затем использовать spark-milvus для записи. Для получения дополнительной информации обратитесь к демонстрации.
milvusbinlog
Новый формат данных milvusbinlog предназначен для чтения встроенных в Milvus данных binlog. Binlog - это внутренний формат хранения данных Milvus, основанный на parquet. К сожалению, он не может быть прочитан обычной библиотекой parquet, поэтому мы реализовали этот новый формат данных, чтобы помочь Spark job прочитать его. Не рекомендуется использовать milvusbinlog напрямую, если вы не знакомы с деталями внутреннего хранилища milvus. Мы предлагаем использовать функцию MilvusUtils, которая будет представлена в следующем разделе.
val df = spark.read
.format("milvusbinlog")
.load(path)
.withColumnRenamed("val", "embedding")
mjson
Milvus предоставляет функциональность Bulkinsert для повышения производительности записи при работе с большими наборами данных. Однако формат JSON, используемый Milvus, несколько отличается от формата JSON, используемого по умолчанию в Spark. Чтобы решить эту проблему, мы ввели формат данных mjson для генерации данных, соответствующих требованиям Milvus. Вот пример, показывающий разницу между JSON-lines и mjson:
JSON-lines:
{"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]} {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]} {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]} {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]} {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
mjson (требуется для Milvus Bulkinsert):
{ "rows":[ {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}, {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}, {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}, {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}, {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]} ] }
Это будет улучшено в будущем. Мы рекомендуем использовать формат parquet в интеграции spark-milvus, если ваша версия Milvus - v2.3.7+, которая поддерживает bulkinsert с форматом Parquet. Смотрите демонстрацию на Github.
MilvusUtils
MilvusUtils содержит несколько полезных функций util. В настоящее время он поддерживается только в Scala. Примеры использования приведены в разделе Advanced Usage.
MilvusUtils.readMilvusCollection
MilvusUtils.readMilvusCollection - это простой интерфейс для загрузки всей коллекции Milvus в Spark Dataframe. Он включает в себя различные операции, в том числе вызов Milvus SDK, чтение milvusbinlog и общие операции объединения/соединения.
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
MilvusUtils.bulkInsertFromSpark
MilvusUtils.bulkInsertFromSpark предоставляет удобный способ импортировать выходные файлы Spark в Milvus большой партией. Он оборачивает API Bullkinsert из Milvus SDK.
df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")
Расширенное использование
В этом разделе вы найдете примеры расширенного использования Spark-Milvus Connector для анализа и миграции данных. Дополнительные демонстрации смотрите в примерах.
MySQL -> встраивание -> Milvus
В этой демонстрации мы
- Прочитаем данные из MySQL через Spark-MySQL Connector,
- генерировать встраивание (на примере Word2Vec) и
- записывать встроенные данные в Milvus.
Чтобы включить Spark-MySQL Connector, вам нужно добавить следующую зависимость в ваше окружение Spark:
spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._
import org.apache.spark.ml.linalg.Vector
object Mysql2MilvusDemo extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("Mysql2MilvusDemo")
.getOrCreate()
import spark.implicits._
// Create DataFrame
val sampleDF = Seq(
(1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
(2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
(3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
(4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
).toDF("id", "text")
// Write to MySQL Table
sampleDF.write
.mode(SaveMode.Append)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.save()
// Read from MySQL Table
val dfMysql = spark.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "demo")
.option("user", "root")
.option("password", "123456")
.load()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
val tokenizedDf = tokenizer.transform(dfMysql)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("tokens")
.setOutputCol("vectors")
.setVectorSize(128)
.setMinCount(0)
val model = word2Vec.fit(tokenizedDf)
val result = model.transform(tokenizedDf)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// Apply the UDF to the DataFrame
val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
val milvusDf = resultDF.drop("tokens").drop("vectors")
milvusDf.write.format("milvus")
.option(MILVUS_HOST, "localhost")
.option(MILVUS_PORT, "19530")
.option(MILVUS_COLLECTION_NAME, "text_embedding")
.option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
.option(MILVUS_COLLECTION_VECTOR_DIM, "128")
.option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
.mode(SaveMode.Append)
.save()
}
Milvus -> Transform -> Milvus
В этом демонстрационном примере мы
- Считывать данные из коллекции Milvus,
- Применим трансформацию (на примере PCA) и
- запишем преобразованные данные в другой Milvus с помощью Bulkinsert API.
Модель PCA - это модель трансформации, которая уменьшает размерность векторов вложения, что является распространенной операцией в машинном обучении. Вы можете добавить любые другие операции обработки, такие как фильтрация, объединение или нормализация, к шагу трансформации.
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}
import scala.collection.JavaConverters._
object TransformDemo extends App {
val sparkConf = new SparkConf().setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val host = "localhost"
val port = 19530
val user = "root"
val password = "Milvus"
val fs = "s3a://"
val bucketName = "a-bucket"
val rootPath = "files"
val minioAK = "minioadmin"
val minioSK = "minioadmin"
val minioEndpoint = "localhost:9000"
val collectionName = "hello_spark_milvus1"
val targetCollectionName = "hello_spark_milvus2"
val properties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
// 1, configurations
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))
// 2, batch read milvus collection data to dataframe
// Schema: dim of `embeddings` is 8
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=8 |
// +-+------------+------------+------------------+
val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
.withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
.drop("embeddings")
// 3. Use PCA to reduce dim of vector
val dim = 4
val pca = new PCA()
.setInputCol("embeddings_vec")
.setOutputCol("pca_vec")
.setK(dim)
.fit(collectionDF)
val vectorToArrayUDF = udf((v: Vector) => v.toArray)
// embeddings dim number reduce to 4
// +-+------------+------------+------------------+
// | | field name | field type | other attributes |
// +-+------------+------------+------------------+
// |1| "pk" | Int64 | is_primary=True |
// | | | | auto_id=False |
// +-+------------+------------+------------------+
// |2| "random" | Double | |
// +-+------------+------------+------------------+
// |3|"embeddings"| FloatVector| dim=4 |
// +-+------------+------------+------------------+
val pcaDf = pca.transform(collectionDF)
.withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
.select("pk", "random", "embeddings")
// 4. Write PCAed data to S3
val outputPath = "s3a://a-bucket/result"
pcaDf.write
.mode("overwrite")
.format("parquet")
.save(outputPath)
// 5. Config MilvusOptions of target table
val targetProperties = Map(
MilvusOptions.MILVUS_HOST -> host,
MilvusOptions.MILVUS_PORT -> port.toString,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// 6. Bulkinsert Spark output files into milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}
Databricks -> Zilliz Cloud
Если вы используете Zilliz Cloud (управляемый сервис Milvus), вы можете воспользоваться его удобным API импорта данных. Zilliz Cloud предоставляет исчерпывающие инструменты и документацию, чтобы помочь вам эффективно перемещать данные из различных источников данных, включая Spark и Databricks. Просто настройте ведро S3 в качестве посредника и откройте к нему доступ в своей учетной записи Zilliz Cloud. API импорта данных Zilliz Cloud автоматически загрузит всю партию данных из ведра S3 в ваш кластер Zilliz Cloud.
Подготовительные работы
Загрузите среду выполнения Spark, добавив jar-файл в кластер Databricks.
Библиотеку можно установить разными способами. На этом скриншоте показана загрузка jar-файла из локальной сети в кластер. Дополнительные сведения см. в разделе Кластерные библиотеки в документации Databricks.
Установка библиотеки Databricks
Создайте ведро S3 и настройте его как внешнее хранилище для кластера Databricks.
Для Bulkinsert требуется хранить данные во временном ведре, чтобы Zilliz Cloud мог импортировать их в пакетном режиме. Вы можете создать ведро S3 и настроить его как внешнее хранилище Databricks. Подробности см. в разделе Внешние местоположения.
Защитите учетные данные Databricks.
Для получения более подробной информации см. инструкции в блоге Securely Managing Credentials in Databricks.
Демонстрация
Вот фрагмент кода, демонстрирующий процесс пакетной миграции данных. Аналогично приведенному выше примеру Milvus, вам просто нужно заменить учетные данные и адрес ведра S3.
// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
.mode("overwrite")
.format("mjson")
.save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
MilvusOptions.MILVUS_URI -> zilliz_uri,
MilvusOptions.MILVUS_TOKEN -> zilliz_token,
MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
MilvusOptions.MILVUS_BUCKET -> bucketName,
MilvusOptions.MILVUS_ROOTPATH -> rootPath,
MilvusOptions.MILVUS_FS -> fs,
MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")
Практический блокнот
Чтобы быстро освоить коннектор Spark-Milvus Connector, вы можете просмотреть блокнот, в котором рассмотрены примеры потокового и пакетного переноса данных из Spark в Milvus и Zilliz Cloud.