🚀 جرب Zilliz Cloud، الـ Milvus المدارة بالكامل، مجاناً — تجربة أداء أسرع بـ 10 أضعاف! جرب الآن>>

milvus-logo
LFAI
الصفحة الرئيسية
  • الأدوات
  • Home
  • Docs
  • الأدوات

  • موصلات ميلفوس

  • سبارك

استخدام Apache Spark™ مع Milvus/Zilliz Cloud لخطوط أنابيب الذكاء الاصطناعي

يوفر موصّل Spark-Milvus Connector تكامل Apache Spark وDatabricks مع Milvus وZilliz Cloud. فهو يربط بين ميزات معالجة البيانات الضخمة القوية في Apache Spark وميزات التعلم الآلي (ML) في Apache Spark مع قدرات البحث المتجه المتطورة في Milvus. يتيح هذا التكامل سير عمل مبسّط للبحث المدعوم بالذكاء الاصطناعي والتحليلات المتقدمة وتدريب تعلّم الآلة والإدارة الفعالة للبيانات المتجهة واسعة النطاق.

Apache Spark هي عبارة عن منصة معالجة بيانات موزعة مصممة للتعامل مع مجموعات البيانات الضخمة بحسابات عالية السرعة. عند إقرانها مع Milvus أو Zilliz Cloud، فإنها تفتح إمكانيات جديدة لحالات الاستخدام مثل البحث الدلالي وأنظمة التوصيات وتحليلات البيانات القائمة على الذكاء الاصطناعي.

على سبيل المثال، يمكن ل Spark معالجة مجموعات البيانات الكبيرة على دفعات لإنشاء تضمينات عبر نماذج التعلم الآلي، ثم استخدام موصل Spark-Milvus لتخزين هذه التضمينات مباشرة في Milvus أو Zilliz Cloud. وبمجرد فهرستها، يمكن البحث في هذه البيانات أو تحليلها بسرعة، مما يؤدي إلى إنشاء خط أنابيب قوي للذكاء الاصطناعي وسير عمل البيانات الضخمة.

يدعم موصل Spark-Milvus مهام مثل الإدخال التكراري والجماعي للبيانات في Milvus، ومزامنة البيانات بين الأنظمة، والتحليلات المتقدمة على البيانات المتجهة المخزنة في Milvus. سيرشدك هذا الدليل إلى الخطوات اللازمة لتهيئة الرابط واستخدامه بفعالية لحالات الاستخدام مثل:

  • تحميل بيانات المتجهات بكفاءة في Milvus على دفعات كبيرة,
  • نقل البيانات بين ميلفوس وأنظمة التخزين أو قواعد البيانات الأخرى,
  • تحليل البيانات في Milvus من خلال الاستفادة من Spark MLlib وأدوات الذكاء الاصطناعي الأخرى.

البدء السريع

التحضير

يدعم موصل Spark-Milvus Connector لغتي البرمجة Scala و Python. يمكن للمستخدمين استخدامه مع Pyspark أو Spark-shell. لتشغيل هذا العرض التوضيحي، قم بإعداد بيئة Spark التي تحتوي على تبعية Spark-Milvus Connector في الخطوات التالية:

  1. تثبيت أباتشي سبارك (الإصدار >= 3.3.0)

    يمكنك تثبيت Apache Spark بالرجوع إلى الوثائق الرسمية.

  2. قم بتنزيل ملف جرة شرارة ميلفوس.

    wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar
    
  3. ابدأ وقت تشغيل Spark مع جرة شرارة ميلفوس كأحد التبعيات.

    لبدء وقت تشغيل Spark مع Spark-Milvus Connector، أضف الشرارة-ميلفوس التي تم تنزيلها كأحد التبعيات إلى الأمر.

    • بيسبارك

      ./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
      
    • شرارة قذيفة

      ./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
      

عرض توضيحي

في هذا العرض التوضيحي، نقوم بإنشاء نموذج Spark DataFrame مع بيانات متجهة ونكتبها إلى Milvus من خلال Spark-Milvus Connector. سيتم إنشاء مجموعة في Milvus تلقائيًا استنادًا إلى المخطط والخيارات المحددة.

from pyspark.sql import SparkSession

columns = ["id", "text", "vec"]
data = [(1, "a", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (2, "b", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (3, "c", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0]),
    (4, "d", [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0])]
sample_df = spark.sparkContext.parallelize(data).toDF(columns)
sample_df.write \
    .mode("append") \
    .option("milvus.host", "localhost") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "hello_spark_milvus") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "8") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()
import org.apache.spark.sql.{SaveMode, SparkSession}

object Hello extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("HelloSparkMilvus")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "a", Seq(1.0,2.0,3.0,4.0,5.0)),
    (2, "b", Seq(1.0,2.0,3.0,4.0,5.0)),
    (3, "c", Seq(1.0,2.0,3.0,4.0,5.0)),
    (4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
  ).toDF("id", "text", "vec")

  // set milvus options
  val milvusOptions = Map(
      "milvus.host" -> "localhost" -> uri,
      "milvus.port" -> "19530",
      "milvus.collection.name" -> "hello_spark_milvus",
      "milvus.collection.vectorField" -> "vec",
      "milvus.collection.vectorDim" -> "5",
      "milvus.collection.primaryKeyField", "id"
    )
    
  sampleDF.write.format("milvus")
    .options(milvusOptions)
    .mode(SaveMode.Append)
    .save()
}

بعد تنفيذ الكود أعلاه، يمكنك عرض البيانات المدرجة في ميلفوس باستخدام SDK أو Attu (لوحة معلومات ميلفوس). يمكنك العثور على مجموعة باسم hello_spark_milvus تم إنشاؤها مع 4 كيانات تم إدراجها بالفعل فيها.

الميزات والمفاهيم

خيارات ملفوس

في قسم البداية السريعة، عرضنا خيارات الإعداد أثناء العمليات مع ميلفوس. يتم تجريد هذه الخيارات كخيارات Milvus. يتم استخدامها لإنشاء اتصالات مع ميلفوس والتحكم في سلوكيات ميلفوس الأخرى. ليست كل الخيارات إلزامية.

مفتاح الخيارالقيمة الافتراضيةالوصف
milvus.hostlocalhostمضيف خادم ملفوس. راجع إدارة اتصالات Milvus للحصول على التفاصيل.
milvus.port19530منفذ خادم ميلفوس. راجع إدارة اتصالات Milvus للحصول على التفاصيل.
milvus.usernamerootاسم المستخدم لخادم ميلفوس. انظر إدارة اتصالات Milvus للحصول على التفاصيل.
milvus.passwordMilvusكلمة المرور لخادم ميلفوس. راجع إدارة اتصالات Milvus للحصول على التفاصيل.
milvus.uri--URI لخادم Milvus URI. راجع إدارة اتصالات ملفوس للحصول على التفاصيل.
milvus.token--الرمز المميز لخادم ميلفوس. راجع إدارة اتصالات ملفوس للحصول على التفاصيل.
milvus.database.namedefaultاسم قاعدة بيانات ملفوس للقراءة أو الكتابة.
milvus.collection.namehello_milvusاسم مجموعة Milvus المراد قراءتها أو كتابتها.
milvus.collection.primaryKeyFieldNoneاسم حقل المفتاح الأساسي في المجموعة. مطلوب في حالة عدم وجود المجموعة.
milvus.collection.vectorFieldNoneاسم الحقل المتجه في المجموعة. مطلوب إذا كانت المجموعة غير موجودة.
milvus.collection.vectorDimNoneبُعد الحقل المتجه في المجموعة. مطلوب إذا كانت المجموعة غير موجودة.
milvus.collection.autoIDfalseإذا لم تكن المجموعة غير موجودة، يحدد هذا الخيار ما إذا كان سيتم إنشاء معرفات للكيانات تلقائيًا. لمزيد من المعلومات، راجع إنشاء_مجموعة
milvus.bucketa-bucketاسم المجموعة في مخزن ميلفوس. يجب أن يكون هذا هو نفسه minio.bucketName في milvus.yaml.
milvus.rootpathfilesالمسار الجذر لتخزين Milvus. يجب أن يكون هذا هو نفسه minio.rootpath في milvus.yaml.
milvus.fss3a://نظام ملفات وحدة تخزين Milvus. تنطبق القيمة s3a:// على Spark مفتوح المصدر. استخدم s3:// لـ Databricks.
milvus.storage.endpointlocalhost:9000نقطة النهاية لتخزين Milvus. يجب أن يكون هذا هو نفسه minio.address:minio.port في milvus.yaml.
milvus.storage.userminioadminمستخدم وحدة تخزين Milvus. يجب أن يكون هذا هو نفسه minio.accessKeyID في milvus.yaml.
milvus.storage.passwordminioadminكلمة مرور مخزن ميلفوس. يجب أن تكون هي نفسها minio.secretAccessKey في milvus.yaml.
milvus.storage.useSSLfalseما إذا كان يجب استخدام SSL لتخزين Milvus. يجب أن يكون هذا هو نفسه minio.useSSL في milvus.yaml.

تنسيق بيانات Milvus

يدعم موصل Spark-Milvus Connector قراءة وكتابة البيانات بتنسيقات بيانات Milvus التالية:

  • milvus: تنسيق بيانات Milvus للتحويل السلس من Spark DataFrame إلى كيانات Milvus.
  • milvusbinlog: تنسيق بيانات Milvus لقراءة بيانات مدونة Milvus المدمجة.
  • mjson: تنسيق Milvus JSON لإدخال البيانات المجمعة في Milvus.

ميلفوس

في البداية السريعة، نستخدم تنسيق milvus لكتابة بيانات نموذجية في مجموعة Milvus. تنسيق milvus هو تنسيق بيانات جديد يدعم كتابة بيانات Spark DataFrame بسلاسة في مجموعات Milvus. يتم تحقيق ذلك من خلال استدعاءات دفعية إلى واجهة برمجة التطبيقات Insert API الخاصة بـ Milvus SDK. في حالة عدم وجود مجموعة في Milvus، سيتم إنشاء مجموعة جديدة بناءً على مخطط إطار البيانات. ومع ذلك، قد لا تدعم المجموعة التي تم إنشاؤها تلقائيًا جميع ميزات مخطط المجموعة. لذلك، يوصى بإنشاء مجموعة عبر SDK أولاً ثم استخدام شرارة ميلفوس للكتابة. لمزيد من المعلومات، يرجى الرجوع إلى العرض التوضيحي.

ميلفوسبينوغ

تنسيق البيانات الجديد milvusbinlog مخصص لقراءة بيانات Milvus binlog المدمجة في Milvus. Binlog هو تنسيق تخزين البيانات الداخلية لـ Milvus استناداً إلى الباركيه. لسوء الحظ، لا يمكن قراءتها من قبل مكتبة باركيه عادية، لذلك قمنا بتنفيذ تنسيق البيانات الجديد هذا لمساعدة وظيفة Spark على قراءتها. لا يوصى باستخدام تنسيق milvusbinlog مباشرةً إلا إذا كنت على دراية بتفاصيل التخزين الداخلي لـ Milvus. نقترح استخدام دالة MilvusUtils التي سيتم تقديمها في القسم التالي.

val df = spark.read
  .format("milvusbinlog")
  .load(path)
  .withColumnRenamed("val", "embedding")

مجسون

يوفر ميلفوس وظيفة بولكنسيرت لتحسين أداء الكتابة عند العمل مع مجموعات البيانات الكبيرة. ومع ذلك، فإن تنسيق JSON المستخدم من قبل Milvus يختلف قليلاً عن تنسيق إخراج JSON الافتراضي الخاص بـ Spark. لحل هذه المشكلة، نقدم تنسيق بيانات mjson لتوليد بيانات تلبي متطلبات Milvus. فيما يلي مثال يوضح الفرق بين JSON-lines و mjson:

  • JSON-lines:

    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]}
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]}
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]}
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]}
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
    
  • mjson (مطلوب لـ Milvus Bulkinsert):

    {
        "rows":[
            {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
            {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
            {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
            {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
            {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
        ]
    }
    

سيتم تحسين هذا في المستقبل. نوصي باستخدام تنسيق الباركيه في تكامل شرارة ميلفوس إذا كان إصدار ميلفوس الخاص بك هو الإصدار 2.3.7+ الذي يدعم بولكنسيرت بتنسيق الباركيه. انظر العرض التوضيحي على Github.

MilvusUtils

يحتوي MilvusUtils على العديد من دوال الاستخدام المفيدة. وهي مدعومة حاليًا في سكالا فقط. المزيد من أمثلة الاستخدام في قسم الاستخدام المتقدم.

MilvusUtils.readMilvusCollection

MilvusUtils.readMilvusCollection هي واجهة بسيطة لتحميل مجموعة Milvus كاملة في إطار بيانات Spark. وهي تغلف العديد من العمليات، بما في ذلك استدعاء Milvus SDK، وقراءة milvusbinlog وعمليات الاتحاد/الربط الشائعة.

val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)

MilvusUtils.bulkInsertFromSpark

يوفر MilvusUtils.bulkInsertFertFromSpark طريقة ملائمة لاستيراد ملفات إخراج Spark إلى Milvus دفعة واحدة. وهي تلتف على واجهة برمجة تطبيقات Bullkinsert الخاصة بحزمة تطوير البرمجيات Milvus SDK.

df.write.format("parquet").save(outputPath)
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputPath, "parquet")

الاستخدام المتقدم

في هذا القسم، ستجد في هذا القسم أمثلة استخدام متقدمة لموصل Spark-Milvus لتحليل البيانات وترحيلها. لمزيد من العروض التوضيحية، انظر الأمثلة.

MySQL -> التضمين -> ميلفوس

في هذا العرض التوضيحي، سنقوم بما يلي

  1. قراءة البيانات من MySQL من خلال موصل Spark-MySQL,
  2. توليد التضمين (باستخدام Word2Vec كمثال)، و
  3. كتابة البيانات المضمنة في ملفوس.

لتمكين موصل Spark-MySQL، تحتاج إلى إضافة التبعية التالية إلى بيئة Spark الخاصة بك:

spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar,mysql-connector-j-x.x.x.jar
import org.apache.spark.ml.feature.{Tokenizer, Word2Vec}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{SaveMode, SparkSession}
import zilliztech.spark.milvus.MilvusOptions._

import org.apache.spark.ml.linalg.Vector

object Mysql2MilvusDemo  extends App {

  val spark = SparkSession.builder().master("local[*]")
    .appName("Mysql2MilvusDemo")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq(
    (1, "Milvus was created in 2019 with a singular goal: store, index, and manage massive embedding vectors generated by deep neural networks and other machine learning (ML) models."),
    (2, "As a database specifically designed to handle queries over input vectors, it is capable of indexing vectors on a trillion scale. "),
    (3, "Unlike existing relational databases which mainly deal with structured data following a pre-defined pattern, Milvus is designed from the bottom-up to handle embedding vectors converted from unstructured data."),
    (4, "As the Internet grew and evolved, unstructured data became more and more common, including emails, papers, IoT sensor data, Facebook photos, protein structures, and much more.")
  ).toDF("id", "text")

  // Write to MySQL Table
  sampleDF.write
    .mode(SaveMode.Append)
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .save()

  // Read from MySQL Table
  val dfMysql = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/test")
    .option("dbtable", "demo")
    .option("user", "root")
    .option("password", "123456")
    .load()

  val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("tokens")
  val tokenizedDf = tokenizer.transform(dfMysql)

  // Learn a mapping from words to Vectors.
  val word2Vec = new Word2Vec()
    .setInputCol("tokens")
    .setOutputCol("vectors")
    .setVectorSize(128)
    .setMinCount(0)
  val model = word2Vec.fit(tokenizedDf)

  val result = model.transform(tokenizedDf)

  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // Apply the UDF to the DataFrame
  val resultDF = result.withColumn("embedding", vectorToArrayUDF($"vectors"))
  val milvusDf = resultDF.drop("tokens").drop("vectors")

  milvusDf.write.format("milvus")
    .option(MILVUS_HOST, "localhost")
    .option(MILVUS_PORT, "19530")
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()
}

ميلفوس -> تحويل -> ميلفوس

في هذا العرض التوضيحي، سنقوم بما يلي

  1. قراءة البيانات من مجموعة Milvus,
  2. تطبيق تحويل (باستخدام PCA كمثال)، و
  3. كتابة البيانات المحولة إلى ملفوس آخر عبر واجهة برمجة تطبيقات بولكنسيرت.

نموذج PCA هو نموذج تحويل يقلل من أبعاد متجهات التضمين، وهي عملية شائعة في التعلم الآلي. يمكنك إضافة أي عمليات معالجة أخرى، مثل التصفية أو الضم أو التطبيع، إلى خطوة التحويل.

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}

import scala.collection.JavaConverters._

object TransformDemo extends App {
  val sparkConf = new SparkConf().setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  import spark.implicits._

  val host = "localhost"
  val port = 19530
  val user = "root"
  val password = "Milvus"
  val fs = "s3a://"
  val bucketName = "a-bucket"
  val rootPath = "files"
  val minioAK = "minioadmin"
  val minioSK = "minioadmin"
  val minioEndpoint = "localhost:9000"
  val collectionName = "hello_spark_milvus1"
  val targetCollectionName = "hello_spark_milvus2"

  val properties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> collectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )

  // 1, configurations
  val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

  // 2, batch read milvus collection data to dataframe
  //  Schema: dim of `embeddings` is 8
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=8        |
  // +-+------------+------------+------------------+
  val arrayToVectorUDF = udf((arr: Seq[Double]) => Vectors.dense(arr.toArray[Double]))
  val collectionDF = MilvusUtils.readMilvusCollection(spark, milvusOptions)
    .withColumn("embeddings_vec", arrayToVectorUDF($"embeddings"))
    .drop("embeddings")
  
  // 3. Use PCA to reduce dim of vector
  val dim = 4
  val pca = new PCA()
    .setInputCol("embeddings_vec")
    .setOutputCol("pca_vec")
    .setK(dim)
    .fit(collectionDF)
  val vectorToArrayUDF = udf((v: Vector) => v.toArray)
  // embeddings dim number reduce to 4
  // +-+------------+------------+------------------+
  // | | field name | field type | other attributes |
  // +-+------------+------------+------------------+
  // |1|    "pk"    |    Int64   |  is_primary=True |
  // | |            |            |   auto_id=False  |
  // +-+------------+------------+------------------+
  // |2|  "random"  |    Double  |                  |
  // +-+------------+------------+------------------+
  // |3|"embeddings"| FloatVector|     dim=4        |
  // +-+------------+------------+------------------+
  val pcaDf = pca.transform(collectionDF)
    .withColumn("embeddings", vectorToArrayUDF($"pca_vec"))
    .select("pk", "random", "embeddings")

  // 4. Write PCAed data to S3
  val outputPath = "s3a://a-bucket/result"
  pcaDf.write
    .mode("overwrite")
    .format("parquet")
    .save(outputPath)

  // 5. Config MilvusOptions of target table  
  val targetProperties = Map(
    MilvusOptions.MILVUS_HOST -> host,
    MilvusOptions.MILVUS_PORT -> port.toString,
    MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
    MilvusOptions.MILVUS_BUCKET -> bucketName,
    MilvusOptions.MILVUS_ROOTPATH -> rootPath,
    MilvusOptions.MILVUS_FS -> fs,
    MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
    MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
    MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
  )
  val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
  // 6. Bulkinsert Spark output files into milvus
  MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
}

داتابريكس -> زيليز كلاود

إذا كنت تستخدم Zilliz Cloud (خدمة Milvus المُدارة)، يمكنك الاستفادة من واجهة برمجة تطبيقات استيراد البيانات الملائمة. توفر Zilliz Cloud أدوات ووثائق شاملة لمساعدتك على نقل بياناتك بكفاءة من مصادر بيانات مختلفة، بما في ذلك Spark وDatabricks. ما عليك سوى إعداد دلو S3 كوسيط وفتح وصوله إلى حساب Zilliz Cloud الخاص بك. ستقوم واجهة برمجة تطبيقات استيراد البيانات في زيليز كلاود بتحميل دفعة كاملة من البيانات تلقائيًا من دلو S3 إلى مجموعة زيليز كلاود الخاصة بك.

التحضيرات

  1. قم بتحميل وقت تشغيل Spark عن طريق إضافة ملف جرة إلى مجموعة Databricks Cluster الخاصة بك.

    يمكنك تثبيت مكتبة بطرق مختلفة. تُظهر لقطة الشاشة هذه تحميل جرة من المحلية إلى الكتلة. لمزيد من المعلومات، راجع مكتبات الكتلة في وثائق Databricks.

    Install Databricks Library تثبيت مكتبة داتابريكس

  2. قم بإنشاء دلو S3 وقم بتكوينه كموقع تخزين خارجي لمجموعة مكتبات Databricks الخاصة بك.

    يتطلب بولكنسيرت تخزين البيانات المطلوبة في دلو مؤقت بحيث يمكن لزيليز كلاود استيراد البيانات دفعة واحدة. يمكنك إنشاء دلو S3 وتهيئته كموقع خارجي لـ داتابريكس. يرجى الرجوع إلى المواقع الخارجية للحصول على التفاصيل.

  3. قم بتأمين بيانات اعتماد Databricks الخاصة بك.

    لمزيد من التفاصيل، ارجع إلى الإرشادات الموجودة في المدونة إدارة بيانات الاعتماد بشكل آمن في داتابريكس.

عرض توضيحي

إليك مقتطف رمز يعرض عملية ترحيل البيانات المجمعة. على غرار مثال ميلفوس أعلاه، تحتاج فقط إلى استبدال بيانات الاعتماد وعنوان دلو S3.

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_URI -> zilliz_uri,
  MilvusOptions.MILVUS_TOKEN -> zilliz_token,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
  
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "mjson")

المفكرة العملية

لمساعدتك على البدء سريعًا في استخدام موصل Spark-Milvus Connector، يمكنك الاطلاع على دفتر الملاحظات الذي يرشدك خلال أمثلة استيعاب البيانات المتدفقة والدفعية ل Spark إلى Milvus و Zilliz Cloud.

جرب Managed Milvus مجاناً

Zilliz Cloud خالي من المتاعب، ويعمل بواسطة Milvus ويعمل بسرعة 10 أضعاف.

ابدأ
التعليقات

هل كانت هذه الصفحة مفيدة؟