milvus-logo
LFAI
Home
  • Guia do utilizador

Preparar os dados de origem

Esta página aborda algo que deve considerar antes de começar a inserir dados em massa na sua coleção.

Antes de começar

A coleção de destino requer o mapeamento dos dados de origem para o seu esquema. O diagrama abaixo mostra como os dados de origem aceitáveis são mapeados para o esquema de uma coleção de destino.

Map data to schema Mapear dados para o esquema

Deve examinar cuidadosamente os seus dados e conceber o esquema da coleção de destino em conformidade.

Tomando como exemplo os dados JSON no diagrama acima, existem duas entidades na lista de linhas, cada linha com seis campos. O esquema da coleção inclui seletivamente quatro: id, vetor, scalar_1 e scalar_2.

Há mais dois aspectos a considerar ao conceber o esquema:

  • Se deve ser ativado o AutoID

    O campo id serve como o campo primário da coleção. Para fazer com que o campo primário seja incrementado automaticamente, pode ativar o AutoID no esquema. Neste caso, deve excluir o campo id de cada linha nos dados de origem.

  • Ativar ou não campos dinâmicos

    A coleção de destino também pode armazenar campos não incluídos no seu esquema predefinido se o esquema permitir campos dinâmicos. O campo $meta é um campo JSON reservado para armazenar campos dinâmicos e seus valores em pares de valores chave. No diagrama acima, os campos dynamic_field_1 e dynamic_field_2 e os valores serão guardados como pares de valores chave no campo $meta.

O código seguinte mostra como configurar o esquema para a coleção ilustrada no diagrama acima.

Para obter mais informações, consulte create_schema() e add_field() na referência do SDK.

Para obter mais informações, consulte CollectionSchema na referência do SDK.

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=True
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="scalar_1", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="scalar_2", datatype=DataType.INT64)

schema.verify()
import io.milvus.grpc.DataType;
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;

// Define schema for the target collection
FieldType id = FieldType.newBuilder()
        .withName("id")
        .withDataType(DataType.Int64)
        .withPrimaryKey(true)
        .withAutoID(false)
        .build();

FieldType vector = FieldType.newBuilder()
        .withName("vector")
        .withDataType(DataType.FloatVector)
        .withDimension(768)
        .build();

FieldType scalar1 = FieldType.newBuilder()
        .withName("scalar_1")
        .withDataType(DataType.VarChar)
        .withMaxLength(512)
        .build();

FieldType scalar2 = FieldType.newBuilder()
        .withName("scalar_2")
        .withDataType(DataType.Int64)
        .build();

CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
        .withEnableDynamicField(true)
        .addFieldType(id)
        .addFieldType(vector)
        .addFieldType(scalar1)
        .addFieldType(scalar2)
        .build();

Configurar o BulkWriter

BulkWriter é uma ferramenta concebida para converter conjuntos de dados brutos num formato adequado para importação através da API de importação RESTful. Ele oferece dois tipos de gravadores:

  • LocalBulkWriter: Lê o conjunto de dados designado e o transforma em um formato fácil de usar.
  • RemoteBulkWriter: Executa a mesma tarefa que o LocalBulkWriter, mas transfere adicionalmente os arquivos de dados convertidos para um bucket de armazenamento de objetos remoto especificado.

RemoteBulkWriter difere de LocalBulkWriter porque RemoteBulkWriter transfere os arquivos de dados convertidos para um bucket de armazenamento de objetos de destino.

Configurar o LocalBulkWriter

Um LocalBulkWriter acrescenta linhas do conjunto de dados de origem e transfere-as para um ficheiro local do formato especificado.

from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
# Use `from pymilvus import LocalBulkWriter, BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    segment_size=512 * 1024 * 1024, # Default value
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;

LocalBulkWriterParam localBulkWriterParam = LocalBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withLocalPath(".")
    .withChunkSize(512 * 1024 * 1024)
    .withFileType(BulkFileType.PARQUET)
    .build();

LocalBulkWriter localBulkWriter = new LocalBulkWriter(localBulkWriterParam);

Ao criar um LocalBulkWriter, deve:

  • Referenciar o esquema criado em schema.
  • Definir local_path como o diretório de saída.
  • Definir file_type para o tipo de ficheiro de saída.
  • Se o conjunto de dados contiver um grande número de registos, é aconselhável segmentar os dados, definindo segment_size para um valor adequado.

Para obter detalhes sobre as definições de parâmetros, consulte LocalBulkWriter na referência do SDK.

Ao criar um LocalBulkWriter, deve:

  • Referenciar o esquema criado em CollectionSchema().
  • Definir o diretório de saída em withLocalPath().
  • Definir o tipo de ficheiro de saída em withFileType().
  • Se o conjunto de dados contiver um grande número de registos, é aconselhável segmentar os dados, definindo withChunkSize() para um valor adequado.

Para obter detalhes sobre as definições de parâmetros, consulte LocalBulkWriter na referência do SDK.

Configurar o RemoteBulkWriter

Em vez de confirmar os dados anexados a um arquivo local, um RemoteBulkWriter confirma-os em um bucket remoto. Por conseguinte, deve configurar um objeto ConnectParam antes de criar um RemoteBulkWriter.

from pymilvus.bulk_writer import RemoteBulkWriter
# Use `from pymilvus import RemoteBulkWriter` 
# when you use pymilvus earlier than 2.4.2 

# Third-party constants
ACCESS_KEY="minioadmin"
SECRET_KEY="minioadmin"
BUCKET_NAME="milvus-bucket"

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="localhost:9000", # the default MinIO service started along with Milvus
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=False
)
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;

String ACCESS_KEY = "minioadmin";
String SECRET_KEY = "minioadmin";
String BUCKET_NAME = "milvus-bucket";

StorageConnectParam storageConnectParam = S3ConnectParam.newBuilder()
    .withEndpoint(MINIO_URI)
    .withAccessKey(ACCESS_KEY)
    .withSecretKey(SECRET_KEY)
    .withBucketName(BUCKET_NAME)
    .build();

Quando os parâmetros de ligação estiverem prontos, pode referenciá-los no RemoteBulkWriter da seguinte forma:

from pymilvus.bulk_writer import BulkFileType
# Use `from pymilvus import BulkFileType` 
# when you use pymilvus earlier than 2.4.2 

writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
)
import io.milvus.bulkwriter.RemoteBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;

RemoteBulkWriterParam remoteBulkWriterParam = RemoteBulkWriterParam.newBuilder()
    .withCollectionSchema(schema)
    .withConnectParam(storageConnectParam)
    .withChunkSize(512 * 1024 * 1024)
    .withRemotePath("/")
    .withFileType(BulkFileType.PARQUET)
    .build();

RemoteBulkWriter remoteBulkWriter = new RemoteBulkWriter(remoteBulkWriterParam);

Os parâmetros para criar um RemoteBulkWriter são praticamente os mesmos que os de um LocalBulkWriter, exceto connect_param. Para mais pormenores sobre as definições dos parâmetros, consulte RemoteBulkWriter e ConnectParam na referência do SDK.

Os parâmetros para criar um RemoteBulkWriter são praticamente os mesmos que os de um LocalBulkWriter, exceto StorageConnectParam. Para obter detalhes sobre as definições dos parâmetros, consulte RemoteBulkWriter e StorageConnectParam na referência do SDK.

Iniciar a escrita

Um BulkWriter tem dois métodos: append_row() adiciona uma linha de um conjunto de dados de origem e commit() confirma as linhas adicionadas em um arquivo local ou em um bucket remoto.

Um BulkWriter tem dois métodos: appendRow() adiciona uma linha a partir de um conjunto de dados de origem e commit() confirma as linhas adicionadas num ficheiro local ou num bucket remoto.

Para fins de demonstração, o código a seguir anexa dados gerados aleatoriamente.

import random
import string

def generate_random_str(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector": [random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_str(random.randint(1, 20)),
        "scalar_2": random.randint(0, 100)
    })
    
writer.commit()
import com.alibaba.fastjson.JSONObject;

for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

Uma vez que o esquema definido permite campos dinâmicos, também pode incluir campos não definidos pelo esquema nos dados a inserir da seguinte forma.

import random
import string

def generate_random_string(length=5):
    letters = string.ascii_uppercase
    digits = string.digits
    
    return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
    writer.append_row({
        "id": i, 
        "vector":[random.uniform(-1, 1) for _ in range(768)],
        "scalar_1": generate_random_string(),
        "scalar_2": random.randint(0, 100),
        "dynamic_field_1": random.choice([True, False]),
        "dynamic_field_2": random.randint(0, 100)
    })
    
writer.commit()
for (int i = 0; i < 10000; i++) {
    JSONObject json = new JSONObject();
    json.put("id", i);
    json.put("vector", get_random_vector(768));
    json.put("scalar_1", get_random_string(20));
    json.put("scalar_2", (long) (Math.random() * 100));
    json.put("dynamic_field_1", get_random_boolean());
    json.put("dynamic_field_2", (long) (Math.random() * 100));

    // localBulkWriter.appendRow(json);
    remoteBulkWriter.appendRow(json);
}

// localBulkWriter.commit(false);
remoteBulkWriter.commit(false);

Verificar os resultados

Para verificar os resultados, pode obter o caminho de saída real imprimindo a propriedade batch_files do escritor.

Para verificar os resultados, pode obter o caminho de saída real imprimindo o método getBatchFiles() do escritor.

print(writer.batch_files)

# [['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/1.parquet'],
#  ['d4220a9e-45be-4ccb-8cb5-bf09304b9f23/2.parquet']]
// localBulkWriter.getBatchFiles();
remoteBulkWriter.getBatchFiles();

// 

// Close the BulkWriter
try {
    localBulkWriter.close();
    remoteBulkWriter.close();            
} catch (Exception e) {
    // TODO: handle exception
    e.printStackTrace();
}

O BulkWriter gera um UUID, cria uma subpasta utilizando o UUID no diretório de saída fornecido e coloca todos os ficheiros gerados na subpasta. Clique aqui para descarregar os dados de amostra preparados.

As estruturas de pastas possíveis são as seguintes:

# JSON
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.json 

# Parquet
├── folder
│   └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
│       └── 1.parquet 

Traduzido porDeepLogo

Feedback

Esta página foi útil?