milvus-logo
LFAI
Home
  • Integrações
    • Orquestração

Geração Aumentada por Recuperação (RAG) com Milvus e BentoML

Open In Colab

Introdução

Este guia demonstra como utilizar um modelo de incorporação de código aberto e um modelo de grande linguagem no BentoCloud com a base de dados de vectores Milvus para construir uma aplicação RAG (Retrieval Augmented Generation). O BentoCloud é uma plataforma de inferência de IA para equipas de IA em rápida evolução, oferecendo uma infraestrutura totalmente gerida e adaptada à inferência de modelos. Funciona em conjunto com o BentoML, uma estrutura de serviço de modelo de código aberto, para facilitar a criação e a implementação de serviços de modelo de alto desempenho. Nesta demonstração, utilizamos o Milvus Lite como base de dados vetorial, que é a versão leve do Milvus que pode ser incorporada na sua aplicação Python.

Antes de começar

O Milvus Lite está disponível no PyPI. Pode instalá-lo através do pip para Python 3.8+:

$ pip install -U pymilvus bentoml

Se estiver a usar o Google Colab, para ativar as dependências acabadas de instalar, pode ser necessário reiniciar o runtime (Clique no menu "Runtime" no topo do ecrã, e selecione "Restart session" no menu dropdown).

Depois de entrar no BentoCloud, podemos interagir com os serviços BentoCloud implementados em Deployments, e o END_POINT e API correspondentes estão localizados em Playground -> Python. Pode descarregar os dados da cidade aqui.

Servir Embeddings com BentoML/BentoCloud

Para usar este endpoint, importe bentoml e configure um cliente HTTP usando o SyncHTTPClient especificando o endpoint e opcionalmente o token (se ativar Endpoint Authorization no BentoCloud). Em alternativa, pode usar o mesmo modelo servido através do BentoML usando o seu repositório Sentence Transformers Embeddings.

import bentoml

BENTO_EMBEDDING_MODEL_END_POINT = "BENTO_EMBEDDING_MODEL_END_POINT"
BENTO_API_TOKEN = "BENTO_API_TOKEN"

embedding_client = bentoml.SyncHTTPClient(
    BENTO_EMBEDDING_MODEL_END_POINT, token=BENTO_API_TOKEN
)

Depois de nos ligarmos ao embedding_client, precisamos de processar os nossos dados. Fornecemos várias funções para efetuar a divisão e a incorporação dos dados.

Ler ficheiros e pré-processar o texto numa lista de cadeias de caracteres.

# naively chunk on newlines
def chunk_text(filename: str) -> list:
    with open(filename, "r") as f:
        text = f.read()
    sentences = text.split("\n")
    return sentences

Primeiro, precisamos de descarregar os dados da cidade.

import os
import requests
import urllib.request

# set up the data source
repo = "ytang07/bento_octo_milvus_RAG"
directory = "data"
save_dir = "./city_data"
api_url = f"https://api.github.com/repos/{repo}/contents/{directory}"


response = requests.get(api_url)
data = response.json()

if not os.path.exists(save_dir):
    os.makedirs(save_dir)

for item in data:
    if item["type"] == "file":
        file_url = item["download_url"]
        file_path = os.path.join(save_dir, item["name"])
        urllib.request.urlretrieve(file_url, file_path)

Em seguida, processamos cada um dos ficheiros que temos.

# please upload your data directory under this file's folder
cities = os.listdir("city_data")
# store chunked text for each of the cities in a list of dicts
city_chunks = []
for city in cities:
    chunked = chunk_text(f"city_data/{city}")
    cleaned = []
    for chunk in chunked:
        if len(chunk) > 7:
            cleaned.append(chunk)
    mapped = {"city_name": city.split(".")[0], "chunks": cleaned}
    city_chunks.append(mapped)

Divide uma lista de strings numa lista de embeddings, cada uma agrupando 25 strings de texto.

def get_embeddings(texts: list) -> list:
    if len(texts) > 25:
        splits = [texts[x : x + 25] for x in range(0, len(texts), 25)]
        embeddings = []
        for split in splits:
            embedding_split = embedding_client.encode(sentences=split)
            embeddings += embedding_split
        return embeddings
    return embedding_client.encode(
        sentences=texts,
    )

Agora, precisamos de fazer corresponder os embeddings e os pedaços de texto. Uma vez que a lista de embeddings e a lista de frases devem corresponder por índice, podemos enumerate através de qualquer uma das listas para as fazer corresponder.

entries = []
for city_dict in city_chunks:
    # No need for the embeddings list if get_embeddings already returns a list of lists
    embedding_list = get_embeddings(city_dict["chunks"])  # returns a list of lists
    # Now match texts with embeddings and city name
    for i, embedding in enumerate(embedding_list):
        entry = {
            "embedding": embedding,
            "sentence": city_dict["chunks"][
                i
            ],  # Assume "chunks" has the corresponding texts for the embeddings
            "city": city_dict["city_name"],
        }
        entries.append(entry)
    print(entries)

Inserção de dados numa base de dados vetorial para recuperação

Com os nossos embeddings e dados preparados, podemos inserir os vectores juntamente com os metadados no Milvus Lite para pesquisa de vectores mais tarde. O primeiro passo nesta secção é iniciar um cliente ligando-se ao Milvus Lite. Basta importar o módulo MilvusClient e inicializar um cliente Milvus Lite que se liga à sua base de dados vetorial Milvus Lite. O tamanho da dimensão vem do tamanho do modelo de incorporação, por exemplo, o modelo Sentence Transformer all-MiniLM-L6-v2 produz vectores de 384 dimensões.

from pymilvus import MilvusClient

COLLECTION_NAME = "Bento_Milvus_RAG"  # random name for your collection
DIMENSION = 384

# Initialize a Milvus Lite client
milvus_client = MilvusClient("milvus_demo.db")

Quanto ao argumento de MilvusClient:

  • Definir o uri como um ficheiro local, por exemplo,./milvus.db, é o método mais conveniente, uma vez que utiliza automaticamente o Milvus Lite para armazenar todos os dados neste ficheiro.
  • Se tiver uma grande escala de dados, pode configurar um servidor Milvus mais eficiente em docker ou kubernetes. Nesta configuração, utilize o uri do servidor, por exemplo,http://localhost:19530, como o seu uri.
  • Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste os endereços uri e token, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud.

Ou com a antiga API connections.connect (não recomendado):

from pymilvus import connections

connections.connect(uri="milvus_demo.db")

Criar a sua coleção Milvus Lite

Criar uma coleção usando o Milvus Lite envolve dois passos: primeiro, definir o esquema, e segundo, definir o índice. Para esta secção, precisamos de um módulo: DataType diz-nos que tipo de dados estarão num campo. Também precisamos de utilizar duas funções para criar o esquema e adicionar campos. create_schema(): cria um esquema de coleção, add_field(): adiciona um campo ao esquema de uma coleção.

from pymilvus import MilvusClient, DataType, Collection

# Create schema
schema = MilvusClient.create_schema(
    auto_id=True,
    enable_dynamic_field=True,
)

# 3.2. Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)

Agora que criámos o nosso esquema e definimos com êxito o campo de dados, temos de definir o índice. Em termos de pesquisa, um "índice" define como vamos mapear os nossos dados para recuperação. Utilizamos a opção predefinida AUTOINDEX para indexar os nossos dados para este projeto.

Em seguida, criamos a coleção com o nome, o esquema e o índice previamente fornecidos. Por fim, inserimos os dados previamente processados.

# prepare index parameters
index_params = milvus_client.prepare_index_params()

# add index
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",  # use autoindex instead of other complex indexing method
    metric_type="COSINE",  # L2, COSINE, or IP
)

# create collection
if milvus_client.has_collection(collection_name=COLLECTION_NAME):
    milvus_client.drop_collection(collection_name=COLLECTION_NAME)
milvus_client.create_collection(
    collection_name=COLLECTION_NAME, schema=schema, index_params=index_params
)

# Outside the loop, now you upsert all the entries at once
milvus_client.insert(collection_name=COLLECTION_NAME, data=entries)

Configurar o LLM para o RAG

Para construir uma aplicação RAG, precisamos de implementar um LLM na BentoCloud. Vamos usar o último LLM Llama3. Quando estiver a funcionar, basta copiar o endpoint e o token deste serviço modelo e configurar um cliente para o mesmo.

BENTO_LLM_END_POINT = "BENTO_LLM_END_POINT"

llm_client = bentoml.SyncHTTPClient(BENTO_LLM_END_POINT, token=BENTO_API_TOKEN)

Instruções do LLM

Agora, configuramos as instruções LLM com o prompt, o contexto e a pergunta. Aqui está a função que se comporta como um LLM e retorna a saída do cliente em um formato de string.

def dorag(question: str, context: str):

    prompt = (
        f"You are a helpful assistant. The user has a question. Answer the user question based only on the context: {context}. \n"
        f"The user question is {question}"
    )

    results = llm_client.generate(
        max_tokens=1024,
        prompt=prompt,
    )

    res = ""
    for result in results:
        res += result

    return res

Um exemplo de RAG

Agora estamos prontos para fazer uma pergunta. Esta função simplesmente recebe uma pergunta e depois faz RAG para gerar o contexto relevante a partir da informação de fundo. Depois, passamos o contexto e a pergunta para dorag() e obtemos o resultado.

question = "What state is Cambridge in?"


def ask_a_question(question):
    embeddings = get_embeddings([question])
    res = milvus_client.search(
        collection_name=COLLECTION_NAME,
        data=embeddings,  # search for the one (1) embedding returned as a list of lists
        anns_field="embedding",  # Search across embeddings
        limit=5,  # get me the top 5 results
        output_fields=["sentence"],  # get the sentence/chunk and city
    )

    sentences = []
    for hits in res:
        for hit in hits:
            print(hit)
            sentences.append(hit["entity"]["sentence"])
    context = ". ".join(sentences)
    return context


context = ask_a_question(question=question)
print(context)

Implementar RAG

print(dorag(question=question, context=context))

Para o exemplo da pergunta sobre o estado em que se encontra Cambridge, podemos imprimir toda a resposta do BentoML. No entanto, se nos dermos ao trabalho de a analisar, fica mais agradável e deve dizer-nos que Cambridge está localizada em Massachusetts.