milvus-logo
LFAI
Home
  • Integrationen
    • Datenquellen

Erstellen Sie eine RAG mit Milvus und Unstructured

Open In Colab GitHub Repository

Unstructured bietet eine Plattform und Werkzeuge zur Aufnahme und Verarbeitung unstrukturierter Dokumente für Retrieval Augmented Generation (RAG) und Modell-Feinabstimmung. Es bietet sowohl eine no-code UI-Plattform als auch serverlose API-Dienste, die es den Benutzern ermöglichen, Daten auf von Unstructured gehosteten Rechenressourcen zu verarbeiten.

In diesem Tutorial werden wir Unstructured zum Einlesen von PDF-Dokumenten verwenden und dann Milvus zum Aufbau einer RAG-Pipeline nutzen.

Vorbereitung

Abhängigkeiten und Umgebung

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Rand des Bildschirms und wählen Sie "Restart session" aus dem Dropdown-Menü).

Sie können Ihre UNSTRUCTURED_API_KEY und UNSTRUCTURED_URL Umgebungsvariablen von hier beziehen.

Wir werden in diesem Beispiel OpenAI als LLM verwenden. Sie sollten den api-Schlüssel OPENAI_API_KEY als Umgebungsvariable vorbereiten.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Vorbereiten der Milvus- und OpenAI-Clients

Sie können den Milvus-Client verwenden, um eine Milvus-Sammlung zu erstellen und Daten in sie einzufügen.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

Wie beim Argument MilvusClient:

  • Die Einstellung von uri als lokale Datei, z. B../milvus.db, ist die bequemste Methode, da sie automatisch Milvus Lite verwendet, um alle Daten in dieser Datei zu speichern.
  • Wenn Sie große Datenmengen haben, z. B. mehr als eine Million Vektoren, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Serveradresse und den Port als Uri, z. B.http://localhost:19530. Wenn Sie die Authentifizierungsfunktion auf Milvus aktivieren, verwenden Sie "<Ihr_Benutzername>:<Ihr_Passwort>" als Token, andernfalls setzen Sie das Token nicht.
  • Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Dienst für Milvus, verwenden möchten, passen Sie uri und token an, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.

Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, falls dies der Fall ist.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Bereiten Sie einen OpenAI-Client vor, um Einbettungen zu erzeugen und Antworten zu generieren.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Milvus-Sammlung erstellen

Wir werden eine Sammlung mit dem folgenden Schema erstellen:

  • iddem Primärschlüssel, der ein eindeutiger Bezeichner für jedes Dokument ist.
  • vector: die Einbettung des Dokuments.
  • textder Textinhalt des Dokuments
  • metadata: die Metadaten des Dokuments.

Dann erstellen wir einen AUTOINDEX Index für das Feld vector. Und dann erstellen wir die Sammlung.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

Laden von Daten aus Unstructured

Unstructured bietet eine flexible und leistungsstarke Ingest-Pipeline zur Verarbeitung verschiedener Dateitypen, einschließlich PDF, HTML usw. Wir werden die Ingest-Funktionalität nutzen, um PDF-Dateien in einem lokalen Verzeichnis zu partitionieren. Und dann die Daten in Milvus laden.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Einfügen von Daten in Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

Abrufen und Erzeugen einer Antwort

Definieren Sie eine Funktion zum Abrufen relevanter Dokumente aus Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Definieren Sie eine Funktion, um eine Antwort unter Verwendung der abgerufenen Dokumente in der RAG-Pipeline zu erzeugen.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Lassen Sie uns die RAG-Pipeline mit einer Beispielfrage testen.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Übersetzt vonDeepLogo

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

War diese Seite hilfreich?