Erste Schritte mit Dynamiq und Milvus
Dynamiq ist ein leistungsstarkes Gen AI-Framework, das die Entwicklung von KI-gestützten Anwendungen vereinfacht. Mit robuster Unterstützung für Retrieval-Augmented Generation (RAG) und Large Language Model (LLM) Agenten ermöglicht Dynamiq Entwicklern die einfache und effiziente Erstellung intelligenter, dynamischer Systeme.
In diesem Tutorial erfahren Sie, wie Sie Dynamiq nahtlos mit Milvus, der leistungsstarken Vektordatenbank, die speziell für RAG-Workflows entwickelt wurde, einsetzen können. Milvus zeichnet sich durch eine effiziente Speicherung, Indizierung und Abfrage von Vektoreinbettungen aus und ist damit eine unverzichtbare Komponente für KI-Systeme, die einen schnellen und präzisen kontextbezogenen Datenzugriff erfordern.
Diese Schritt-für-Schritt-Anleitung behandelt zwei zentrale RAG-Workflows:
Document Indexing Flow: Lernen Sie, wie Sie Eingabedateien (z.B. PDFs) verarbeiten, ihren Inhalt in Vektoreinbettungen umwandeln und sie in Milvus speichern. Die Nutzung der leistungsstarken Indizierungsfunktionen von Milvus stellt sicher, dass Ihre Daten für einen schnellen Abruf bereit sind.
Document Retrieval Flow: Erfahren Sie, wie Sie Milvus nach relevanten Dokumenteneinbettungen abfragen und diese nutzen, um mit den LLM-Agenten von Dynamiq aufschlussreiche, kontextbezogene Antworten zu generieren und so eine nahtlose KI-gestützte Benutzererfahrung zu schaffen.
Am Ende dieses Tutorials werden Sie ein solides Verständnis dafür erlangen, wie Milvus und Dynamiq zusammenarbeiten, um skalierbare, kontextbezogene KI-Systeme zu entwickeln, die auf Ihre Bedürfnisse zugeschnitten sind.
Vorbereitung
Herunterladen der erforderlichen Bibliotheken
$ pip install dynamiq pymilvus
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü Runtime" am oberen Rand des Bildschirms und wählen Sie Sitzung neu starten" aus dem Dropdown-Menü).
Konfigurieren Sie den LLM-Agenten
Wir werden in diesem Beispiel OpenAI als LLM verwenden. Sie sollten den api-Schlüssel OPENAI_API_KEY
als Umgebungsvariable vorbereiten.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - Fluss der Dokumentenindizierung
Dieses Tutorial demonstriert einen Retrieval-Augmented Generation (RAG) Workflow zur Indizierung von Dokumenten mit Milvus als Vektordatenbank. Der Workflow nimmt PDF-Eingabedateien, verarbeitet sie in kleinere Teile, erzeugt Vektoreinbettungen mit dem Einbettungsmodell von OpenAI und speichert die Einbettungen in einer Milvus-Sammlung für eine effiziente Suche.
Am Ende dieses Workflows werden Sie über ein skalierbares und effizientes Dokumentenindizierungssystem verfügen, das zukünftige RAG-Aufgaben wie semantische Suche und Fragenbeantwortung unterstützt.
Erforderliche Bibliotheken importieren und Workflow initialisieren
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
PDF-Konverter-Knoten definieren
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
Dokumentensplitter-Knoten definieren
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
Einbettungs-Knoten definieren
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Milvus-Vektorspeicher-Knoten definieren
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus bietet zwei Deployment-Typen an, die unterschiedliche Anwendungsfälle abdecken:
- MilvusDeploymentType.FILE
- Ideal für das lokale Prototyping oder die Speicherung kleinerer Datenmengen.
- Setzen Sie
uri
auf einen lokalen Dateipfad (z. B../milvus.db
), um Milvus Lite zu nutzen, das alle Daten automatisch in der angegebenen Datei speichert. - Dies ist eine praktische Option für die schnelle Einrichtung und das Experimentieren.
- MilvusDeploymentType.HOST
Konzipiert für umfangreiche Datenszenarien, z. B. die Verwaltung von über einer Million Vektoren.
Selbstgehosteter Server
- Stellen Sie einen leistungsstarken Milvus-Server mit Docker oder Kubernetes bereit.
- Konfigurieren Sie die Adresse und den Port des Servers als
uri
(z. B.http://localhost:19530
). - Wenn die Authentifizierung aktiviert ist:
- Geben Sie
<your_username>:<your_password>
alstoken
an. - Wenn die Authentifizierung deaktiviert ist:
- Lassen Sie die
token
unverändert.
Zilliz Cloud (Verwalteter Dienst)
- Für eine vollständig verwaltete, Cloud-basierte Milvus-Erfahrung verwenden Sie Zilliz Cloud.
- Stellen Sie
uri
undtoken
entsprechend dem öffentlichen Endpunkt und dem API-Schlüssel ein, die in der Zilliz Cloud-Konsole bereitgestellt werden.
Definieren Sie die Eingabedaten und führen Sie den Workflow aus
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
Mit diesem Workflow haben wir erfolgreich eine Dokumentenindizierungspipeline implementiert, die Milvus als Vektordatenbank und das Einbettungsmodell von OpenAI für die semantische Darstellung verwendet. Dieser Aufbau ermöglicht ein schnelles und genaues vektorbasiertes Retrieval und bildet die Grundlage für RAG-Workflows wie semantische Suche, Dokumentenretrieval und kontextbezogene KI-gesteuerte Interaktionen.
Mit den skalierbaren Speichermöglichkeiten von Milvus und der Orchestrierung von Dynamiq ist diese Lösung sowohl für das Prototyping als auch für groß angelegte Produktionsimplementierungen geeignet. Sie können diese Pipeline nun erweitern, um zusätzliche Aufgaben wie Retrieval-basierte Fragebeantwortung oder KI-gesteuerte Inhaltsgenerierung einzubeziehen.
Ablauf der RAG-Dokumentenrecherche
In diesem Tutorial implementieren wir einen RAG-Workflow (Retrieval-Augmented Generation) zum Abrufen von Dokumenten. Dieser Workflow nimmt eine Benutzeranfrage, generiert eine Vektoreinbettung für sie, ruft die relevantesten Dokumente aus einer Milvus-Vektordatenbank ab und verwendet ein großes Sprachmodell (LLM), um eine detaillierte und kontextbezogene Antwort auf der Grundlage der abgerufenen Dokumente zu generieren.
Wenn Sie diesen Arbeitsablauf befolgen, schaffen Sie eine End-to-End-Lösung für die semantische Suche und die Beantwortung von Fragen, indem Sie die Leistungsfähigkeit der vektorbasierten Dokumentensuche mit den Fähigkeiten der fortschrittlichen LLMs von OpenAI kombinieren. Dieser Ansatz ermöglicht effiziente und intelligente Antworten auf Benutzeranfragen, indem er das gespeicherte Wissen in Ihrer Dokumentendatenbank nutzt.
Erforderliche Bibliotheken importieren und Workflow initialisieren
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
Definieren Sie die OpenAI-Verbindung und den Text Embedder
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Milvus Document Retriever definieren
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
Definieren Sie die Prompt-Vorlage
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
Definieren Sie den Antwortgenerator
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
Ausführen des Workflows
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.