Quelldaten vorbereiten
Auf dieser Seite werden einige Punkte besprochen, die Sie beachten sollten, bevor Sie mit dem Einfügen von Massendaten in Ihre Sammlung beginnen.
Bevor Sie beginnen
Die Zielsammlung erfordert eine Zuordnung der Quelldaten zu ihrem Schema. Das folgende Diagramm zeigt, wie akzeptable Quelldaten dem Schema einer Zielsammlung zugeordnet werden.
Zuordnen von Daten zum Schema
Sie sollten Ihre Daten sorgfältig prüfen und das Schema der Zielsammlung entsprechend gestalten.
Nehmen wir die JSON-Daten im obigen Diagramm als Beispiel: Es gibt zwei Entitäten in der Zeilenliste, wobei jede Zeile sechs Felder hat. Das Schema der Sammlung enthält selektiv vier: id, vector, scalar_1 und scalar_2.
Beim Entwurf des Schemas sind noch zwei weitere Dinge zu berücksichtigen:
Ob AutoID aktiviert werden soll.
Das Feld id dient als Primärfeld der Sammlung. Um das Primärfeld automatisch zu inkrementieren, können Sie AutoID im Schema aktivieren. In diesem Fall sollten Sie das id-Feld aus jeder Zeile der Quelldaten ausschließen.
Dynamische Felder aktivieren
Die Zielsammlung kann auch Felder speichern, die nicht in ihrem vordefinierten Schema enthalten sind, wenn das Schema dynamische Felder zulässt. Das $meta-Feld ist ein reserviertes JSON-Feld, um dynamische Felder und ihre Werte in Schlüssel-Wert-Paaren zu speichern. Im obigen Diagramm werden die Felder dynamic_field_1 und dynamic_field_2 und die Werte als Schlüssel-Wert-Paare im $meta-Feld gespeichert.
Der folgende Code zeigt, wie Sie das Schema für die im obigen Diagramm dargestellte Sammlung einrichten.
Weitere Informationen finden Sie unter create_schema()
und add_field()
in der SDK-Referenz.
Um weitere Informationen zu erhalten, siehe CollectionSchema
in der SDK-Referenz.
from pymilvus import MilvusClient, DataType
# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)
schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType vector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType scalar1 = FieldType.newBuilder()
.withName("scalar_1")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType scalar2 = FieldType.newBuilder()
.withName("scalar_2")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(true)
.addFieldType(id)
.addFieldType(vector)
.addFieldType(scalar1)
.addFieldType(scalar2)
.build();
BulkWriter einrichten
BulkWriter ist ein Tool zur Konvertierung von Rohdatensätzen in ein Format, das für den Import über die RESTful Import API geeignet ist. Es bietet zwei Arten von Writern:
- LocalBulkWriter: Liest den angegebenen Datensatz und wandelt ihn in ein einfach zu verwendendes Format um.
- RemoteBulkWriter: Führt dieselbe Aufgabe wie der LocalBulkWriter aus, überträgt die konvertierten Datendateien jedoch zusätzlich in einen angegebenen Remote Object Storage Bucket.
RemoteBulkWriter unterscheidet sich von LocalBulkWriter dadurch, dass RemoteBulkWriter die konvertierten Datendateien in einen Zielobjektspeicherbereich überträgt.
LocalBulkWriter einrichten
Ein LocalBulkWriter fügt Zeilen aus dem Quelldatensatz an und überträgt sie in eine lokale Datei des angegebenen Formats.
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = LocalBulkWriter(
schema=schema,
local_path='.',
segment_size=512 * 1024 * 1024, # Default value
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withLocalPath(".")
.withChunkSize(512 * 1024 * 1024)
.withFileType(BulkFileType.PARQUET)
.build();
LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);
Wenn Sie einen LocalBulkWriter erstellen, sollten Sie:
- Verweisen Sie auf das erstellte Schema in
schema
. - Setzen Sie
local_path
auf das Ausgabeverzeichnis. - Setzen Sie
file_type
auf den Ausgabedateityp. - Wenn Ihr Datensatz eine große Anzahl von Datensätzen enthält, sollten Sie Ihre Daten segmentieren, indem Sie
segment_size
auf einen geeigneten Wert setzen.
Einzelheiten zu den Parametereinstellungen finden Sie unter LocalBulkWriter in der SDK-Referenz.
Wenn Sie einen LocalBulkWriter erstellen, sollten Sie:
- Verweisen Sie auf das erstellte Schema in
CollectionSchema()
. - Legen Sie das Ausgabeverzeichnis in
withLocalPath()
fest. - Legen Sie den Ausgabedateityp in
withFileType()
fest. - Wenn Ihr Datensatz eine große Anzahl von Datensätzen enthält, empfiehlt es sich, Ihre Daten zu segmentieren, indem Sie
withChunkSize()
auf einen geeigneten Wert setzen.
Einzelheiten zu den Parametereinstellungen finden Sie unter LocalBulkWriter in der SDK-Referenz.
RemoteBulkWriter einrichten
Anstatt angehängte Daten in eine lokale Datei zu übertragen, überträgt ein RemoteBulkWriter sie in einen entfernten Bucket. Daher sollten Sie ein ConnectParam-Objekt einrichten, bevor Sie einen RemoteBulkWriter erstellen.
from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter`
# when you use pymilvus earlier than 2.4.2
# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"
# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="localhost:9000", # the default MinIO service started along with Milvus
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";
StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
.withEndpoint(MINIO_URI)
.withAccessKey(ACCESS_KEY)
.withSecretKey(SECRET_KEY)
.withBucketName(BUCKET_NAME)
.build();
Sobald die Verbindungsparameter fertig sind, kann man sie im RemoteBulkWriter wie folgt referenzieren:
from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType`
# when you use pymilvus earlier than 2.4.2
writer = RemoteBulkWriter(
schema=schema,
remote_path="/",
connect_param=conn,
file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(schema)
.withConnectParam(storageConnectParam)
.withChunkSize(512 * 1024 * 1024)
.withRemotePath("/")
.withFileType(BulkFileType.PARQUET)
.build();
RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);
Die Parameter für die Erstellung eines RemoteBulkWriters sind fast dieselben wie die für einen LocalBulkWriter, mit Ausnahme von connect_param
. Details zu den Parametereinstellungen finden Sie unter RemoteBulkWriter und ConnectParam in der SDK-Referenz.
Die Parameter für die Erstellung eines RemoteBulkWriters sind fast die gleichen wie die für einen LocalBulkWriter, außer StorageConnectParam
. Einzelheiten zu den Parametereinstellungen finden Sie unter RemoteBulkWriter und StorageConnectParam in der SDK-Referenz.
Schreiben starten
Ein BulkWriter hat zwei Methoden: append_row()
fügt eine Zeile aus einem Quelldatensatz hinzu, und commit()
überträgt hinzugefügte Zeilen in eine lokale Datei oder einen Remote-Bucket.
Ein BulkWriter hat zwei Methoden: appendRow()
fügt eine Zeile aus einem Quelldatensatz hinzu, und commit()
überträgt hinzugefügte Zeilen in eine lokale Datei oder einen Remote-Bucket.
Zu Demonstrationszwecken fügt der folgende Code zufällig generierte Daten hinzu.
import random
import string
def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector": [random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_str(random.randint(1, 20)),
"scalar_2": random.randint(0, 100)
})
writer.commit()
import com.alibaba.fastjson.JSONObject;
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Da das definierte Schema dynamische Felder zulässt, können Sie auch nicht-schemadefinierte Felder in die einzufügenden Daten aufnehmen, wie im Folgenden beschrieben.
import random
import string
def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits
return ''.join(random.choices(letters + digits, k=length))
for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"scalar_1": generate_random_string(),
"scalar_2": random.randint(0, 100),
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})
writer.commit()
for (int i = 0; i < 10000; i++) {
JSONObject json = new JSONObject();
json.put("id", i);
json.put("vector", get_random_vector(768));
json.put("scalar_1", get_random_string(20));
json.put("scalar_2", (long) (Math.random() * 100));
json.put("dynamic_field_1", get_random_boolean());
json.put("dynamic_field_2", (long) (Math.random() * 100));
// localBulkWriter.appendRow(json);
remoteBulkWriter.appendRow(json);
}
// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);
Überprüfen der Ergebnisse
Um die Ergebnisse zu überprüfen, können Sie den tatsächlichen Ausgabepfad ermitteln, indem Sie die Eigenschaft batch_files
des Writers ausgeben.
Um die Ergebnisse zu überprüfen, können Sie den tatsächlichen Ausgabepfad ermitteln, indem Sie die Methode getBatchFiles()
des Writers ausdrucken.
print(writer.batch_files)
# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
# ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();
//
// Close the BulkWriter
try {
localBulkWriter.close();
remoteBulkWriter.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
BulkWriter generiert eine UUID, erstellt einen Unterordner mit der UUID im angegebenen Ausgabeverzeichnis und legt alle generierten Dateien in diesem Unterordner ab. Klicken Sie hier, um die vorbereiteten Beispieldaten herunterzuladen.
Mögliche Ordnerstrukturen sind wie folgt:
# JSON
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.json
# Parquet
├── folder
│ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│ └── 1.parquet