🚀 Coba Zilliz Cloud, Milvus yang sepenuhnya terkelola, secara gratis—rasakan performa 10x lebih cepat! Coba Sekarang>>

milvus-logo
LFAI
Beranda
  • Peralatan
  • Home
  • Docs
  • Peralatan

  • Konektor Milvus

  • Percikan

Gunakan Apache Spark™ dengan Milvus/Zilliz Cloud untuk Pipeline AI

Konektor Spark-Milvus menyediakan integrasi Apache Spark dan Databricks dengan Milvus dan Zilliz Cloud. Ini menjembatani fitur pemrosesan data besar dan pembelajaran mesin (ML) Apache Spark yang kuat dengan kemampuan pencarian vektor canggih dari Milvus. Integrasi ini memungkinkan alur kerja yang efisien untuk pencarian bertenaga AI, analitik tingkat lanjut, pelatihan ML, dan manajemen data vektor skala besar yang efisien.

Apache Spark adalah platform pemrosesan data terdistribusi yang dirancang untuk menangani kumpulan data yang sangat besar dengan komputasi berkecepatan tinggi. Ketika dipasangkan dengan Milvus atau Zilliz Cloud, platform ini membuka kemungkinan-kemungkinan baru untuk kasus-kasus penggunaan seperti pencarian semantik, sistem rekomendasi, dan analisis data berbasis AI.

Sebagai contoh, Spark dapat memproses kumpulan data besar secara batch untuk menghasilkan embedding melalui model ML, kemudian menggunakan konektor Spark-Milvus untuk menyimpan embedding ini secara langsung di Milvus atau Zilliz Cloud. Setelah diindeks, data ini dapat dengan cepat dicari atau dianalisis, menciptakan pipeline yang kuat untuk AI dan alur kerja data besar.

Konektor Spark-Milvus mendukung tugas-tugas seperti memasukkan data secara berulang dan massal ke dalam Milvus, sinkronisasi data antar sistem, dan analisis lanjutan pada data vektor yang disimpan di Milvus. Panduan ini akan memandu Anda melalui langkah-langkah untuk mengonfigurasi dan menggunakan konektor secara efektif untuk kasus-kasus penggunaan seperti:

  • Memuat data vektor secara efisien ke dalam Milvus dalam jumlah besar,
  • Memindahkan data antara Milvus dan sistem penyimpanan atau basis data lainnya,
  • Menganalisis data di Milvus dengan memanfaatkan Spark MLlib dan alat AI lainnya.

Mulai cepat

Persiapan

Konektor Spark-Milvus mendukung bahasa pemrograman Scala dan Python. Pengguna dapat menggunakannya dengan Pyspark atau Spark-shell. Untuk menjalankan demo ini, siapkan lingkungan Spark yang berisi ketergantungan Spark-Milvus Connector dengan langkah-langkah berikut:

  1. Instal Apache Spark (versi >= 3.3.0)

    Anda dapat menginstal Apache Spark dengan merujuk pada dokumentasi resminya.

  2. Unduh berkas jar spark-milvus.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. Mulai runtime Spark dengan jar spark-milvus sebagai salah satu dependensi.

    Untuk memulai runtime Spark dengan Spark-Milvus Connector, tambahkan spark-milvus yang telah diunduh sebagai dependensi pada perintah.

    • pyspark

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • spark-shell

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

Demo

Dalam demo ini, kita membuat contoh Spark DataFrame dengan data vektor dan menuliskannya ke Milvus melalui Spark-Milvus Connector. Sebuah koleksi akan dibuat di Milvus secara otomatis berdasarkan skema dan opsi yang ditentukan.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

Setelah menjalankan kode di atas, Anda dapat melihat data yang telah dimasukkan di Milvus menggunakan SDK atau Attu (Dasbor Milvus). Anda dapat menemukan koleksi bernama hello_spark_milvus yang dibuat dengan 4 entitas yang telah dimasukkan ke dalamnya.

Fitur & konsep

Opsi-opsi Milvus

Pada bagian Mulai Cepat, kami telah menunjukkan opsi-opsi pengaturan selama operasi dengan Milvus. Opsi-opsi ini disarikan sebagai Opsi Milvus. Opsi-opsi ini digunakan untuk membuat koneksi ke Milvus dan mengontrol perilaku Milvus lainnya. Tidak semua opsi wajib dipilih.

Tombol OpsiNilai DefaultDeskripsi
milvus.hostlocalhostHost server Milvus. Lihat Mengelola Koneksi Milvus untuk detailnya.
milvus.port19530Port server Milvus. Lihat Mengelola Koneksi Milvus untuk detailnya.
milvus.usernamerootNama pengguna untuk server Milvus. Lihat Mengelola Sambungan Milvus untuk detailnya.
milvus.passwordMilvusKata sandi untuk server Milvus. Lihat Mengelola Sambungan Milvus untuk detailnya.
milvus.uri--URI server Milvus. Lihat Mengelola Koneksi Milvus untuk detailnya.
milvus.token--Token server Milvus. Lihat Mengelola Koneksi Milvus untuk detailnya.
milvus.database.namedefaultNama basis data Milvus yang akan dibaca atau ditulis.
milvus.collection.namehello_milvusNama koleksi Milvus yang akan dibaca atau ditulis.
milvus.collection.primaryKeyFieldNoneNama bidang kunci utama dalam koleksi. Diperlukan jika koleksi tidak ada.
milvus.collection.vectorFieldNoneNama bidang vektor dalam koleksi. Diperlukan jika koleksi tidak ada.
milvus.collection.vectorDimNoneDimensi bidang vektor dalam koleksi. Diperlukan jika koleksi tidak ada.
milvus.collection.autoIDfalseJika koleksi tidak ada, opsi ini menentukan apakah akan secara otomatis menghasilkan ID untuk entitas. Untuk informasi lebih lanjut, lihat create_collection
milvus.bucketa-bucketNama ember di penyimpanan Milvus. Ini harus sama dengan minio.bucketName di milvus.yaml.
milvus.rootpathfilesJalur root dari penyimpanan Milvus. Ini harus sama dengan minio.rootpath di milvus.yaml.
milvus.fss3a://Sistem berkas dari penyimpanan Milvus. Nilai s3a:// berlaku untuk Spark yang bersumber terbuka. Gunakan s3:// untuk Databricks.
milvus.storage.endpointlocalhost:9000Titik akhir penyimpanan Milvus. Ini harus sama dengan minio.address:minio.port di milvus.yaml.
milvus.storage.userminioadminPengguna dari penyimpanan Milvus. Ini harus sama dengan minio.accessKeyID di milvus.yaml.
milvus.storage.passwordminioadminKata sandi dari penyimpanan Milvus. Ini harus sama dengan minio.secretAccessKey di milvus.yaml.
milvus.storage.useSSLfalseApakah akan menggunakan SSL untuk penyimpanan Milvus. Ini harus sama dengan minio.useSSL di milvus.yaml.

Format data Milvus

Konektor Spark-Milvus mendukung pembacaan dan penulisan data dalam format data Milvus berikut ini:

  • milvus: Format data Milvus untuk konversi tanpa hambatan dari Spark DataFrame ke entitas Milvus.
  • milvusbinlog: Format data Milvus untuk membaca data binlog bawaan Milvus.
  • mjson: Format JSON Milvus untuk memasukkan data secara massal ke dalam Milvus.

milvus

Dalam Mulai cepat, kita menggunakan format milvus untuk menulis data sampel ke dalam klaster Milvus. Format milvus adalah format data baru yang mendukung penulisan data Spark DataFrame secara mulus ke dalam Koleksi Milvus. Hal ini dicapai dengan panggilan batch ke API Insert dari Milvus SDK. Jika koleksi tidak ada di Milvus, koleksi baru akan dibuat berdasarkan skema Dataframe. Namun, koleksi yang dibuat secara otomatis mungkin tidak mendukung semua fitur dari skema koleksi. Oleh karena itu, disarankan untuk membuat koleksi melalui SDK terlebih dahulu dan kemudian menggunakan spark-milvus untuk menulis. Untuk informasi lebih lanjut, silakan lihat demo.

milvusbinlog

Format data baru milvusbinlog digunakan untuk membaca data binlog bawaan Milvus. Binlog adalah format penyimpanan data internal Milvus yang berbasis parket. Sayangnya, format ini tidak dapat dibaca oleh pustaka parket biasa, sehingga kami mengimplementasikan format data baru ini untuk membantu pekerjaan Spark membacanya. Tidak disarankan untuk menggunakan milvusbinlog secara langsung kecuali jika Anda sudah terbiasa dengan detail penyimpanan internal Milvus. Kami menyarankan untuk menggunakan fungsi MilvusUtils yang akan diperkenalkan pada bagian selanjutnya.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

mjson

Milvus menyediakan fungsionalitas Bulkinsert untuk performa penulisan yang lebih baik ketika beroperasi dengan dataset besar. Namun, format JSON yang digunakan oleh Milvus sedikit berbeda dengan format keluaran JSON bawaan Spark. Untuk mengatasi hal ini, kami memperkenalkan format data mjson untuk menghasilkan data yang memenuhi persyaratan Milvus. Berikut ini adalah contoh yang menunjukkan perbedaan antara JSON-lines dan mjson:

  • Baris-baris JSON:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (Diperlukan untuk Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

Hal ini akan ditingkatkan di masa mendatang. Kami menyarankan untuk menggunakan format parquet dalam integrasi spark-milvus jika versi Milvus Anda adalah v2.3.7+ yang mendukung bulkinsert dengan format Parquet. Lihat Demo di Github.

MilvusUtils

MilvusUtils berisi beberapa fungsi util yang berguna. Saat ini hanya didukung di Scala. Contoh penggunaan lebih lanjut ada di bagian Penggunaan Lanjutan.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection adalah sebuah antarmuka sederhana untuk memuat seluruh koleksi Milvus ke dalam Spark Dataframe. Antarmuka ini membungkus berbagai operasi, termasuk memanggil Milvus SDK, membaca milvusbinlog, dan operasi gabungan/gabungan yang umum.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

MilvusUtils.bulkInsertFromSpark menyediakan cara yang mudah untuk mengimpor file keluaran Spark ke Milvus dalam jumlah yang besar. Ini membungkus API Bullkinsert dari Milvus SDK.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

Penggunaan Tingkat Lanjut

Pada bagian ini, Anda akan menemukan contoh penggunaan lanjutan dari Spark-Milvus Connector untuk analisis dan migrasi data. Untuk lebih banyak demo, lihat contoh.

MySQL -> penyematan -> Milvus

Dalam demo ini, kita akan

  1. Membaca data dari MySQL melalui Konektor Spark-MySQL,
  2. Menghasilkan penyematan (menggunakan Word2Vec sebagai contoh), dan
  3. Menulis data yang disematkan ke dalam Milvus.

Untuk mengaktifkan Spark-MySQL Connector, Anda perlu menambahkan ketergantungan berikut ini ke lingkungan Spark Anda:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

Milvus -> Transform -> Milvus

Dalam demo ini, kita akan

  1. Membaca data dari koleksi Milvus,
  2. Menerapkan transformasi (menggunakan PCA sebagai contoh), dan
  3. Menulis data yang telah ditransformasi ke Milvus lain melalui API Bulkinsert.

Model PCA adalah model tranformasi yang mengurangi dimensi vektor penyisipan, yang merupakan operasi umum dalam pembelajaran mesin. Anda dapat menambahkan operasi pemrosesan lainnya, seperti pemfilteran, penggabungan, atau normalisasi, ke dalam langkah transformasi.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

Databricks -> Zilliz Cloud

Jika Anda menggunakan Zilliz Cloud (layanan Milvus yang dikelola), Anda dapat memanfaatkan API Impor Data yang nyaman. Zilliz Cloud menyediakan alat dan dokumentasi yang komprehensif untuk membantu Anda memindahkan data secara efisien dari berbagai sumber data, termasuk Spark dan Databricks. Cukup siapkan bucket S3 sebagai perantara dan buka aksesnya ke akun Zilliz Cloud Anda. API Impor Data Zilliz Cloud akan secara otomatis memuat seluruh data dari bucket S3 ke cluster Zilliz Cloud Anda.

Persiapan

  1. Muat runtime Spark dengan menambahkan file jar ke Databricks Cluster Anda.

    Anda dapat menginstal library dengan berbagai cara. Tangkapan layar ini menunjukkan pengunggahan jar dari lokal ke cluster. Untuk informasi lebih lanjut, lihat Cluster Libraries di dokumentasi Databricks.

    Install Databricks Library Menginstal Perpustakaan Databricks

  2. Buat bucket S3 dan konfigurasikan sebagai lokasi penyimpanan eksternal untuk cluster Databricks Anda.

    Bulkinsert membutuhkan data untuk disimpan di bucket sementara agar Zilliz Cloud dapat mengimpor data secara batch. Anda dapat membuat bucket S3 dan mengonfigurasikannya sebagai lokasi eksternal untuk databricks. Silakan lihat Lokasi eksternal untuk detailnya.

  3. Amankan kredensial Databricks Anda.

    Untuk lebih jelasnya, lihat petunjuk di blog Mengelola Kredensial dengan Aman di Databricks.

Demo

Berikut adalah cuplikan kode yang menampilkan proses migrasi data batch. Mirip dengan contoh Milvus di atas, Anda hanya perlu mengganti kredensial dan alamat bucket S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

Buku Catatan Praktis

Untuk membantu Anda memulai dengan cepat menggunakan Spark-Milvus Connector, Anda dapat melihat buku catatan yang memandu Anda melalui contoh streaming dan konsumsi data batch untuk Spark ke Milvus dan Zilliz Cloud.

Coba Milvus yang Dikelola secara Gratis

Zilliz Cloud bebas masalah, didukung oleh Milvus dan 10x lebih cepat.

Mulai
Umpan balik

Apakah halaman ini bermanfaat?