milvus-logo
LFAI
Home
  • Integrationen

Open In Colab GitHub Repository

Erste Schritte mit Dynamiq und Milvus

Dynamiq ist ein leistungsstarkes Gen AI-Framework, das die Entwicklung von KI-gestützten Anwendungen vereinfacht. Mit robuster Unterstützung für Retrieval-Augmented Generation (RAG) und Large Language Model (LLM) Agenten ermöglicht Dynamiq Entwicklern die einfache und effiziente Erstellung intelligenter, dynamischer Systeme.

In diesem Tutorial erfahren Sie, wie Sie Dynamiq nahtlos mit Milvus, der leistungsstarken Vektordatenbank, die speziell für RAG-Workflows entwickelt wurde, einsetzen können. Milvus zeichnet sich durch eine effiziente Speicherung, Indizierung und Abfrage von Vektoreinbettungen aus und ist damit eine unverzichtbare Komponente für KI-Systeme, die einen schnellen und präzisen kontextbezogenen Datenzugriff erfordern.

Diese Schritt-für-Schritt-Anleitung behandelt zwei zentrale RAG-Workflows:

  • Document Indexing Flow: Lernen Sie, wie Sie Eingabedateien (z.B. PDFs) verarbeiten, ihren Inhalt in Vektoreinbettungen umwandeln und sie in Milvus speichern. Die Nutzung der leistungsstarken Indizierungsfunktionen von Milvus stellt sicher, dass Ihre Daten für einen schnellen Abruf bereit sind.

  • Document Retrieval Flow: Erfahren Sie, wie Sie Milvus nach relevanten Dokumenteneinbettungen abfragen und diese nutzen, um mit den LLM-Agenten von Dynamiq aufschlussreiche, kontextbezogene Antworten zu generieren und so eine nahtlose KI-gestützte Benutzererfahrung zu schaffen.

Am Ende dieses Tutorials werden Sie ein solides Verständnis dafür erlangen, wie Milvus und Dynamiq zusammenarbeiten, um skalierbare, kontextbezogene KI-Systeme zu entwickeln, die auf Ihre Bedürfnisse zugeschnitten sind.

Vorbereitung

Herunterladen der erforderlichen Bibliotheken

$ pip install dynamiq pymilvus

Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü Runtime" am oberen Rand des Bildschirms und wählen Sie Sitzung neu starten" aus dem Dropdown-Menü).

Konfigurieren Sie den LLM-Agenten

Wir werden in diesem Beispiel OpenAI als LLM verwenden. Sie sollten den api-Schlüssel OPENAI_API_KEY als Umgebungsvariable vorbereiten.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

RAG - Fluss der Dokumentenindizierung

Dieses Tutorial demonstriert einen Retrieval-Augmented Generation (RAG) Workflow zur Indizierung von Dokumenten mit Milvus als Vektordatenbank. Der Workflow nimmt PDF-Eingabedateien, verarbeitet sie in kleinere Teile, erzeugt Vektoreinbettungen mit dem Einbettungsmodell von OpenAI und speichert die Einbettungen in einer Milvus-Sammlung für eine effiziente Suche.

Am Ende dieses Workflows werden Sie über ein skalierbares und effizientes Dokumentenindizierungssystem verfügen, das zukünftige RAG-Aufgaben wie semantische Suche und Fragenbeantwortung unterstützt.

Erforderliche Bibliotheken importieren und Workflow initialisieren

# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter

# Initialize the workflow
rag_wf = Workflow()

PDF-Konverter-Knoten definieren

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
    converter
)  # Add node to the DAG (Directed Acyclic Graph)

Dokumentensplitter-Knoten definieren

document_splitter = DocumentSplitter(
    split_by="sentence",  # Splits documents into sentences
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },
    ),
).depends_on(
    converter
)  # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter)  # Add to the DAG

Einbettungs-Knoten definieren

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(
    document_splitter
)  # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder)  # Add to the DAG

Milvus-Vektorspeicher-Knoten definieren

vector_store = (
    MilvusDocumentWriter(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        create_if_not_exist=True,
        metric_type="COSINE",
    )
    .inputs(documents=embedder.outputs.documents)  # Connect to embedder output
    .depends_on(embedder)  # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store)  # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection

Milvus bietet zwei Deployment-Typen an, die unterschiedliche Anwendungsfälle abdecken:

  1. MilvusDeploymentType.FILE
  • Ideal für das lokale Prototyping oder die Speicherung kleinerer Datenmengen.
  • Setzen Sie uri auf einen lokalen Dateipfad (z. B. ./milvus.db), um Milvus Lite zu nutzen, das alle Daten automatisch in der angegebenen Datei speichert.
  • Dies ist eine praktische Option für die schnelle Einrichtung und das Experimentieren.
  1. MilvusDeploymentType.HOST
  • Konzipiert für umfangreiche Datenszenarien, z. B. die Verwaltung von über einer Million Vektoren.

    Selbstgehosteter Server

    • Stellen Sie einen leistungsstarken Milvus-Server mit Docker oder Kubernetes bereit.
    • Konfigurieren Sie die Adresse und den Port des Servers als uri (z. B. http://localhost:19530).
    • Wenn die Authentifizierung aktiviert ist:
    • Geben Sie <your_username>:<your_password> als token an.
    • Wenn die Authentifizierung deaktiviert ist:
    • Lassen Sie die token unverändert.

    Zilliz Cloud (Verwalteter Dienst)

Definieren Sie die Eingabedaten und führen Sie den Workflow aus

file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
    "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
    "metadata": [{"filename": path} for path in file_paths],
}

# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
  BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
  warnings.warn(  # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
  warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.

Mit diesem Workflow haben wir erfolgreich eine Dokumentenindizierungspipeline implementiert, die Milvus als Vektordatenbank und das Einbettungsmodell von OpenAI für die semantische Darstellung verwendet. Dieser Aufbau ermöglicht ein schnelles und genaues vektorbasiertes Retrieval und bildet die Grundlage für RAG-Workflows wie semantische Suche, Dokumentenretrieval und kontextbezogene KI-gesteuerte Interaktionen.

Mit den skalierbaren Speichermöglichkeiten von Milvus und der Orchestrierung von Dynamiq ist diese Lösung sowohl für das Prototyping als auch für groß angelegte Produktionsimplementierungen geeignet. Sie können diese Pipeline nun erweitern, um zusätzliche Aufgaben wie Retrieval-basierte Fragebeantwortung oder KI-gesteuerte Inhaltsgenerierung einzubeziehen.

Ablauf der RAG-Dokumentenrecherche

In diesem Tutorial implementieren wir einen RAG-Workflow (Retrieval-Augmented Generation) zum Abrufen von Dokumenten. Dieser Workflow nimmt eine Benutzeranfrage, generiert eine Vektoreinbettung für sie, ruft die relevantesten Dokumente aus einer Milvus-Vektordatenbank ab und verwendet ein großes Sprachmodell (LLM), um eine detaillierte und kontextbezogene Antwort auf der Grundlage der abgerufenen Dokumente zu generieren.

Wenn Sie diesen Arbeitsablauf befolgen, schaffen Sie eine End-to-End-Lösung für die semantische Suche und die Beantwortung von Fragen, indem Sie die Leistungsfähigkeit der vektorbasierten Dokumentensuche mit den Fähigkeiten der fortschrittlichen LLMs von OpenAI kombinieren. Dieser Ansatz ermöglicht effiziente und intelligente Antworten auf Benutzeranfragen, indem er das gespeicherte Wissen in Ihrer Dokumentendatenbank nutzt.

Erforderliche Bibliotheken importieren und Workflow initialisieren

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the workflow
retrieval_wf = Workflow()

Definieren Sie die OpenAI-Verbindung und den Text Embedder

# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])

# Define the text embedder node
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)

# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)

Milvus Document Retriever definieren

document_retriever = (
    MilvusDocumentRetriever(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)  # Connect to embedder output
    .depends_on(embedder)  # Dependency on the embedder node
)

# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.

Definieren Sie die Prompt-Vorlage

# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

Definieren Sie den Antwortgenerator

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on(
        [document_retriever, embedder]
    )  # Dependencies on retriever and embedder
)

# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)

Ausführen des Workflows

# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"

result = retrieval_wf.run(input_data={"query": sample_query})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.


The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.

Übersetzt vonDeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

War diese Seite hilfreich?