Erstellen Sie eine RAG mit Milvus und Unstructured
Unstructured bietet eine Plattform und Werkzeuge zur Aufnahme und Verarbeitung unstrukturierter Dokumente für Retrieval Augmented Generation (RAG) und Modell-Feinabstimmung. Es bietet sowohl eine no-code UI-Plattform als auch serverlose API-Dienste, die es den Benutzern ermöglichen, Daten auf von Unstructured gehosteten Rechenressourcen zu verarbeiten.
In diesem Tutorial werden wir Unstructured zum Einlesen von PDF-Dokumenten verwenden und dann Milvus zum Aufbau einer RAG-Pipeline nutzen.
Vorbereitung
Abhängigkeiten und Umgebung
$ pip install -qU "unstructured[pdf]" pymilvus openai
Installationsoptionen:
- Für die Verarbeitung aller Dokumentformate:
pip install "unstructured[all-docs]" - Für bestimmte Formate (z.B. PDF):
pip install "unstructured[pdf]" - Weitere Installationsoptionen finden Sie in der Unstructured-Dokumentation
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Rand des Bildschirms und wählen Sie "Restart session" aus dem Dropdown-Menü).
Wir werden in diesem Beispiel OpenAI als LLM verwenden. Sie sollten den Api-Schlüssel OPENAI_API_KEY als Umgebungsvariable vorbereiten.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Vorbereiten der Milvus- und OpenAI-Clients
Sie können den Milvus-Client verwenden, um eine Milvus-Sammlung zu erstellen und Daten in sie einzufügen.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")
Wie beim Argument MilvusClient:
- Die Einstellung von
urials lokale Datei, z. B../milvus.db, ist die bequemste Methode, da sie automatisch Milvus Lite verwendet, um alle Daten in dieser Datei zu speichern. - Wenn Sie große Datenmengen haben, z. B. mehr als eine Million Vektoren, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Serveradresse und den Port als Uri, z. B.
http://localhost:19530. Wenn Sie die Authentifizierungsfunktion auf Milvus aktivieren, verwenden Sie ": " als Token, andernfalls setzen Sie das Token nicht. - Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Dienst für Milvus, verwenden möchten, passen Sie
uriundtokenan, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.
Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, falls dies der Fall ist.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Bereiten Sie einen OpenAI-Client vor, um Einbettungen zu erzeugen und Antworten zu generieren.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Milvus-Sammlung erstellen
Wir werden eine Sammlung mit dem folgenden Schema erstellen:
iddem Primärschlüssel, der ein eindeutiger Bezeichner für jedes Dokument ist.vector: die Einbettung des Dokuments.textder Textinhalt des Dokumentsmetadata: die Metadaten des Dokuments.
Dann erstellen wir einen AUTOINDEX Index für das Feld vector. Und dann erstellen wir die Sammlung.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Laden von Daten aus Unstructured
Unstructured bietet eine flexible und leistungsstarke Ingestion-Pipeline zur Verarbeitung verschiedener Dateitypen, einschließlich PDF, HTML usw. Wir werden eine lokale PDF-Datei partitionieren und chunken. Anschließend laden wir die Daten in Milvus.
import warnings
from unstructured.partition.auto import partition
warnings.filterwarnings("ignore")
elements = partition(
filename="./pdf_files/WhatisMilvus.pdf",
strategy="hi_res",
chunking_strategy="by_title",
) # Replace with the path to your PDF file
Schauen wir uns die partitionierten Elemente der PDF-Datei an. Jedes Element steht für einen Teil des Inhalts, der durch den Partitionierungsprozess von Unstructured extrahiert wurde.
for element in elements:
print(element)
break
What is Milvus?
Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.
Einfügen der Daten in Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}
Abrufen und Erzeugen einer Antwort
Definieren Sie eine Funktion zum Abrufen relevanter Dokumente aus Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Definieren Sie eine Funktion, um eine Antwort unter Verwendung der abgerufenen Dokumente in der RAG-Pipeline zu erzeugen.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Lassen Sie uns die RAG-Pipeline mit einer Beispielfrage testen.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.