Erstellen Sie eine RAG mit Milvus und Unstructured
Unstructured bietet eine Plattform und Werkzeuge zur Aufnahme und Verarbeitung unstrukturierter Dokumente für Retrieval Augmented Generation (RAG) und Modell-Feinabstimmung. Es bietet sowohl eine no-code UI-Plattform als auch serverlose API-Dienste, die es den Benutzern ermöglichen, Daten auf von Unstructured gehosteten Rechenressourcen zu verarbeiten.
In diesem Tutorial werden wir Unstructured zum Einlesen von PDF-Dokumenten verwenden und dann Milvus zum Aufbau einer RAG-Pipeline nutzen.
Vorbereitung
Abhängigkeiten und Umgebung
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Rand des Bildschirms und wählen Sie "Restart session" aus dem Dropdown-Menü).
Sie können Ihre UNSTRUCTURED_API_KEY
und UNSTRUCTURED_URL
Umgebungsvariablen von hier beziehen.
Wir werden in diesem Beispiel OpenAI als LLM verwenden. Sie sollten den api-Schlüssel OPENAI_API_KEY
als Umgebungsvariable vorbereiten.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
Vorbereiten der Milvus- und OpenAI-Clients
Sie können den Milvus-Client verwenden, um eine Milvus-Sammlung zu erstellen und Daten in sie einzufügen.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
Wie beim Argument MilvusClient
:
- Die Einstellung von
uri
als lokale Datei, z. B../milvus.db
, ist die bequemste Methode, da sie automatisch Milvus Lite verwendet, um alle Daten in dieser Datei zu speichern. - Wenn Sie große Datenmengen haben, z. B. mehr als eine Million Vektoren, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Serveradresse und den Port als Uri, z. B.
http://localhost:19530
. Wenn Sie die Authentifizierungsfunktion auf Milvus aktivieren, verwenden Sie "<Ihr_Benutzername>:<Ihr_Passwort>" als Token, andernfalls setzen Sie das Token nicht. - Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Dienst für Milvus, verwenden möchten, passen Sie
uri
undtoken
an, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.
Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, falls dies der Fall ist.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Bereiten Sie einen OpenAI-Client vor, um Einbettungen zu erzeugen und Antworten zu generieren.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Milvus-Sammlung erstellen
Wir werden eine Sammlung mit dem folgenden Schema erstellen:
id
dem Primärschlüssel, der ein eindeutiger Bezeichner für jedes Dokument ist.vector
: die Einbettung des Dokuments.text
der Textinhalt des Dokumentsmetadata
: die Metadaten des Dokuments.
Dann erstellen wir einen AUTOINDEX
Index für das Feld vector
. Und dann erstellen wir die Sammlung.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Laden von Daten aus Unstructured
Unstructured bietet eine flexible und leistungsstarke Ingest-Pipeline zur Verarbeitung verschiedener Dateitypen, einschließlich PDF, HTML usw. Wir werden die Ingest-Funktionalität nutzen, um PDF-Dateien in einem lokalen Verzeichnis zu partitionieren. Und dann die Daten in Milvus laden.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Einfügen von Daten in Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
Abrufen und Erzeugen einer Antwort
Definieren Sie eine Funktion zum Abrufen relevanter Dokumente aus Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Definieren Sie eine Funktion, um eine Antwort unter Verwendung der abgerufenen Dokumente in der RAG-Pipeline zu erzeugen.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Lassen Sie uns die RAG-Pipeline mit einer Beispielfrage testen.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.