Open In Colab GitHub Repository

Costruire un RAG con Milvus e Unstructured

Unstructured fornisce una piattaforma e strumenti per l'ingestione e l'elaborazione di documenti non strutturati per la Retrieval Augmented Generation (RAG) e la messa a punto dei modelli. Offre sia una piattaforma UI senza codice che servizi API serverless, consentendo agli utenti di elaborare i dati su risorse di calcolo ospitate da Unstructured.

In questo tutorial, utilizzeremo Unstructured per ingerire documenti PDF e poi useremo Milvus per costruire una pipeline RAG.

Preparazione

Dipendenze e ambiente

$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai

Opzioni di installazione:

  • Per elaborare tutti i formati di documenti: pip install "unstructured[all-docs]"
  • Per formati specifici (ad esempio, PDF): pip install "unstructured[pdf]"
  • Per ulteriori opzioni di installazione, consultare la documentazione di Unstructured.

Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).

In questo esempio utilizzeremo OpenAI come LLM. È necessario preparare la chiave api OPENAI_API_KEY come variabile d'ambiente.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Preparare i client Milvus e OpenAI

È possibile utilizzare il client Milvus per creare una raccolta Milvus e inserirvi i dati.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")

Come per l'argomento di MilvusClient:

  • L'impostazione di uri come file locale, ad esempio./milvus.db, è il metodo più conveniente, poiché utilizza automaticamente Milvus Lite per memorizzare tutti i dati in questo file.
  • Se si dispone di una grande quantità di dati, ad esempio più di un milione di vettori, è possibile configurare un server Milvus più performante su Docker o Kubernetes. In questa configurazione, utilizzare l'indirizzo e la porta del server come uri, ad esempiohttp://localhost:19530. Se si attiva la funzione di autenticazione su Milvus, utilizzare ":" come token, altrimenti non impostare il token.
  • Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito per Milvus, impostare uri e token, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud.

Verificare se la collezione esiste già e, in caso affermativo, eliminarla.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Preparare un client OpenAI per generare embedding e generare risposte.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Generare un embedding di prova e stamparne la dimensione e i primi elementi.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Creare la collezione Milvus

Creeremo una collezione con il seguente schema:

  • id: la chiave primaria, che è un identificatore unico per ogni documento.
  • vector: l'incorporazione del documento.
  • text: il contenuto testuale del documento.
  • metadata: i metadati del documento.

Poi costruiamo un indice AUTOINDEX sul campo vector. Quindi si crea la raccolta.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Bounded",
)

milvus_client.load_collection(collection_name=collection_name)

Caricare i dati da Unstructured

Unstructured fornisce una pipeline di ingestione flessibile e potente per elaborare vari tipi di file, tra cui PDF, HTML e altri ancora. Partizioniamo e dividiamo un file PDF locale. Poi caricheremo i dati in Milvus.

import warnings
from unstructured.partition.auto import partition

warnings.filterwarnings("ignore")

elements = partition(
    filename="./pdf_files/WhatisMilvus.pdf",
    strategy="hi_res",
    chunking_strategy="by_title",
)  # Replace with the path to your PDF file

Esaminiamo gli elementi partizionati del file PDF. Ogni elemento rappresenta un pezzo di contenuto estratto dal processo di partizione di Unstructured.

for element in elements:
    print(element)
    break
What is Milvus?

Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.

Inserire i dati in Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}

Recupero e generazione della risposta

Definire una funzione per recuperare i documenti rilevanti da Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Definire una funzione per generare una risposta utilizzando i documenti recuperati nella pipeline RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Testiamo la pipeline RAG con una domanda di esempio.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Questa pagina è stata utile?