milvus-logo
LFAI
Casa
  • Integrazioni
    • Orchestrazione

Generazione Aumentata dal Recupero (RAG) con Milvus e BentoML

Open In Colab

Introduzione

Questa guida illustra come utilizzare un modello di embedding open-source e un modello di grande lingua su BentoCloud con il database vettoriale Milvus per costruire un'applicazione RAG (Retrieval Augmented Generation). BentoCloud è una piattaforma di inferenza AI per team di AI in rapida evoluzione, che offre un'infrastruttura completamente gestita su misura per l'inferenza di modelli. Funziona insieme a BentoML, un framework open-source per la gestione dei modelli, per facilitare la creazione e la distribuzione di servizi di modelli ad alte prestazioni. In questa demo, utilizziamo Milvus Lite come database vettoriale, che è la versione leggera di Milvus che può essere incorporata nelle applicazioni Python.

Prima di iniziare

Milvus Lite è disponibile su PyPI. È possibile installarlo tramite pip per Python 3.8+:

$ pip install -U pymilvus bentoml

Se si utilizza Google Colab, per abilitare le dipendenze appena installate, potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).

Dopo aver effettuato l'accesso a BentoCloud, possiamo interagire con i servizi BentoCloud distribuiti in Deployments, mentre l'END_POINT e l'API corrispondenti si trovano in Playground -> Python. È possibile scaricare i dati della città qui.

Servire gli embeddings con BentoML/BentoCloud

Per utilizzare questo endpoint, importare bentoml e impostare un client HTTP utilizzando SyncHTTPClient specificando l'endpoint e, facoltativamente, il token (se si attiva Endpoint Authorization su BentoCloud). In alternativa, è possibile utilizzare lo stesso modello servito da BentoML utilizzando il repository Sentence Transformers Embeddings.

import bentoml

BENTO_EMBEDDING_MODEL_END_POINT = "BENTO_EMBEDDING_MODEL_END_POINT"
BENTO_API_TOKEN = "BENTO_API_TOKEN"

embedding_client = bentoml.SyncHTTPClient(
    BENTO_EMBEDDING_MODEL_END_POINT, token=BENTO_API_TOKEN
)

Una volta che ci si connette all'embedding_client, occorre elaborare i dati. Abbiamo fornito diverse funzioni per eseguire la suddivisione e l'incorporazione dei dati.

Leggere i file e preelaborare il testo in un elenco di stringhe.

# naively chunk on newlines
def chunk_text(filename: str) -> list:
    with open(filename, "r") as f:
        text = f.read()
    sentences = text.split("\n")
    return sentences

Per prima cosa dobbiamo scaricare i dati della città.

import os
import requests
import urllib.request

# set up the data source
repo = "ytang07/bento_octo_milvus_RAG"
directory = "data"
save_dir = "./city_data"
api_url = f"https://api.github.com/repos/{repo}/contents/{directory}"


response = requests.get(api_url)
data = response.json()

if not os.path.exists(save_dir):
    os.makedirs(save_dir)

for item in data:
    if item["type"] == "file":
        file_url = item["download_url"]
        file_path = os.path.join(save_dir, item["name"])
        urllib.request.urlretrieve(file_url, file_path)

Quindi, elaboriamo ciascuno dei file disponibili.

# please upload your data directory under this file's folder
cities = os.listdir("city_data")
# store chunked text for each of the cities in a list of dicts
city_chunks = []
for city in cities:
    chunked = chunk_text(f"city_data/{city}")
    cleaned = []
    for chunk in chunked:
        if len(chunk) > 7:
            cleaned.append(chunk)
    mapped = {"city_name": city.split(".")[0], "chunks": cleaned}
    city_chunks.append(mapped)

Suddivide un elenco di stringhe in un elenco di incorporazioni, ciascuna raggruppata 25 stringhe di testo.

def get_embeddings(texts: list) -> list:
    if len(texts) > 25:
        splits = [texts[x : x + 25] for x in range(0, len(texts), 25)]
        embeddings = []
        for split in splits:
            embedding_split = embedding_client.encode(sentences=split)
            embeddings += embedding_split
        return embeddings
    return embedding_client.encode(
        sentences=texts,
    )

Ora dobbiamo abbinare embeddings e pezzi di testo. Dato che l'elenco di embeddings e l'elenco di frasi dovrebbero corrispondere per indice, è possibile scorrere enumerate attraverso entrambi gli elenchi per abbinarli.

entries = []
for city_dict in city_chunks:
    # No need for the embeddings list if get_embeddings already returns a list of lists
    embedding_list = get_embeddings(city_dict["chunks"])  # returns a list of lists
    # Now match texts with embeddings and city name
    for i, embedding in enumerate(embedding_list):
        entry = {
            "embedding": embedding,
            "sentence": city_dict["chunks"][
                i
            ],  # Assume "chunks" has the corresponding texts for the embeddings
            "city": city_dict["city_name"],
        }
        entries.append(entry)
    print(entries)

Inserire i dati in un database vettoriale per recuperarli

Una volta preparati gli embeddings e i dati, possiamo inserire i vettori insieme ai metadati in Milvus Lite per la successiva ricerca vettoriale. Il primo passo di questa sezione è avviare un client collegandosi a Milvus Lite. È sufficiente importare il modulo MilvusClient e inizializzare un client Milvus Lite che si connette al database vettoriale di Milvus Lite. La dimensione della dimensione deriva dalla dimensione del modello di incorporazione, ad esempio il modello Sentence Transformer all-MiniLM-L6-v2 produce vettori di 384 dimensioni.

from pymilvus import MilvusClient

COLLECTION_NAME = "Bento_Milvus_RAG"  # random name for your collection
DIMENSION = 384

# Initialize a Milvus Lite client
milvus_client = MilvusClient("milvus_demo.db")

Come per l'argomento di MilvusClient:

  • L'impostazione di uri come file locale, ad esempio./milvus.db, è il metodo più conveniente, poiché utilizza automaticamente Milvus Lite per memorizzare tutti i dati in questo file.
  • Se si dispone di una grande quantità di dati, è possibile configurare un server Milvus più performante su docker o kubernetes. In questa configurazione, utilizzare l'uri del server, ad esempiohttp://localhost:19530, come uri.
  • Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito per Milvus, regolare uri e token, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud.

Oppure con la vecchia API connections.connect (non raccomandata):

from pymilvus import connections

connections.connect(uri="milvus_demo.db")

Creazione della raccolta Milvus Lite

La creazione di una collezione con Milvus Lite comporta due fasi: la prima è la definizione dello schema e la seconda è la definizione dell'indice. Per questa sezione, abbiamo bisogno di un modulo: DataType, che indica il tipo di dati da inserire in un campo. Dobbiamo anche usare due funzioni per creare lo schema e aggiungere campi. create_schema(): crea lo schema di una collezione, add_field(): aggiunge un campo allo schema di una collezione.

from pymilvus import MilvusClient, DataType, Collection

# Create schema
schema = MilvusClient.create_schema(
    auto_id=True,
    enable_dynamic_field=True,
)

# 3.2. Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)

Ora che abbiamo creato il nostro schema e definito con successo il campo dati, dobbiamo definire l'indice. In termini di ricerca, un "indice" definisce il modo in cui i dati vengono mappati per essere recuperati. Per questo progetto, utilizziamo la scelta predefinita AUTOINDEX per indicizzare i dati.

Quindi, si crea la collezione con il nome, lo schema e l'indice precedentemente indicati. Infine, inseriamo i dati precedentemente elaborati.

# prepare index parameters
index_params = milvus_client.prepare_index_params()

# add index
index_params.add_index(
    field_name="embedding",
    index_type="AUTOINDEX",  # use autoindex instead of other complex indexing method
    metric_type="COSINE",  # L2, COSINE, or IP
)

# create collection
if milvus_client.has_collection(collection_name=COLLECTION_NAME):
    milvus_client.drop_collection(collection_name=COLLECTION_NAME)
milvus_client.create_collection(
    collection_name=COLLECTION_NAME, schema=schema, index_params=index_params
)

# Outside the loop, now you upsert all the entries at once
milvus_client.insert(collection_name=COLLECTION_NAME, data=entries)

Configurare l'LLM per RAG

Per creare un'applicazione RAG, dobbiamo distribuire un LLM su BentoCloud. Utilizziamo l'ultimo LLM Llama3. Una volta che è attivo e funzionante, è sufficiente copiare l'endpoint e il token di questo servizio modello e impostare un client per esso.

BENTO_LLM_END_POINT = "BENTO_LLM_END_POINT"

llm_client = bentoml.SyncHTTPClient(BENTO_LLM_END_POINT, token=BENTO_API_TOKEN)

Istruzioni LLM

Ora impostiamo le istruzioni LLM con il prompt, il contesto e la domanda. Ecco la funzione che si comporta come un LLM e che restituisce l'output del client in un formato stringa.

def dorag(question: str, context: str):

    prompt = (
        f"You are a helpful assistant. The user has a question. Answer the user question based only on the context: {context}. \n"
        f"The user question is {question}"
    )

    results = llm_client.generate(
        max_tokens=1024,
        prompt=prompt,
    )

    res = ""
    for result in results:
        res += result

    return res

Un esempio di RAG

Ora siamo pronti a porre una domanda. Questa funzione prende semplicemente una domanda e poi esegue una RAG per generare il contesto pertinente dalle informazioni di base. Quindi, si passano il contesto e la domanda a dorag() e si ottiene il risultato.

question = "What state is Cambridge in?"


def ask_a_question(question):
    embeddings = get_embeddings([question])
    res = milvus_client.search(
        collection_name=COLLECTION_NAME,
        data=embeddings,  # search for the one (1) embedding returned as a list of lists
        anns_field="embedding",  # Search across embeddings
        limit=5,  # get me the top 5 results
        output_fields=["sentence"],  # get the sentence/chunk and city
    )

    sentences = []
    for hits in res:
        for hit in hits:
            print(hit)
            sentences.append(hit["entity"]["sentence"])
    context = ". ".join(sentences)
    return context


context = ask_a_question(question=question)
print(context)

Implementare RAG

print(dorag(question=question, context=context))

Per la domanda di esempio che chiede in quale stato si trova Cambridge, possiamo stampare l'intera risposta da BentoML. Tuttavia, se ci prendiamo il tempo di analizzarla, il risultato è più gradevole e dovrebbe dirci che Cambridge si trova nel Massachusetts.