milvus-logo
LFAI
Home
  • Intégrations

Open In Colab GitHub Repository

Démarrer avec Dynamiq et Milvus

Dynamiq est un puissant framework Gen AI qui rationalise le développement d'applications basées sur l'IA. Avec un support robuste pour les agents RAG (retrieval-augmented generation) et LLM (large language model), Dynamiq permet aux développeurs de créer des systèmes intelligents et dynamiques avec facilité et efficacité.

Dans ce tutoriel, nous allons explorer comment utiliser Dynamiq avec Milvus, la base de données vectorielle de haute performance conçue pour les flux de travail RAG. Milvus excelle dans le stockage, l'indexation et la récupération efficaces des embeddings vectoriels, ce qui en fait un composant indispensable pour les systèmes d'IA qui exigent un accès rapide et précis aux données contextuelles.

Ce guide étape par étape couvrira deux flux de travail RAG fondamentaux :

  • Flux d'indexation de documents: apprenez à traiter les fichiers d'entrée (par exemple, les PDF), à transformer leur contenu en incorporations vectorielles et à les stocker dans Milvus. L'exploitation des capacités d'indexation hautes performances de Milvus garantit que vos données sont prêtes à être récupérées rapidement.

  • Flux d'extraction de documents: Découvrez comment interroger Milvus pour des encastrements de documents pertinents et les utiliser pour générer des réponses perspicaces et contextuelles avec les agents LLM de Dynamiq, créant ainsi une expérience utilisateur transparente alimentée par l'IA.

À la fin de ce tutoriel, vous aurez acquis une solide compréhension de la façon dont Milvus et Dynamiq travaillent ensemble pour construire des systèmes d'IA évolutifs et contextuels adaptés à vos besoins.

Préparation

Télécharger les bibliothèques nécessaires

$ pip install dynamiq pymilvus

Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).

Configurer l'agent LLM

Nous utiliserons OpenAI comme LLM dans cet exemple. Vous devez préparer la clé api OPENAI_API_KEY en tant que variable d'environnement.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

RAG - Flux d'indexation de documents

Ce tutoriel présente un flux de travail RAG (Retrieval-Augmented Generation) pour l'indexation de documents avec Milvus comme base de données vectorielle. Le flux de travail prend des fichiers PDF en entrée, les traite en plus petits morceaux, génère des embeddings vectoriels en utilisant le modèle d'embedding d'OpenAI, et stocke les embeddings dans une collection Milvus pour une récupération efficace.

À la fin de ce flux de travail, vous disposerez d'un système d'indexation de documents évolutif et efficace qui prendra en charge les futures tâches RAG telles que la recherche sémantique et la réponse aux questions.

Importer les bibliothèques requises et initialiser le flux de travail

# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter

# Initialize the workflow
rag_wf = Workflow()

Définir le nœud de conversion PDF

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
    converter
)  # Add node to the DAG (Directed Acyclic Graph)

Définir le nœud de séparation de documents

document_splitter = DocumentSplitter(
    split_by="sentence",  # Splits documents into sentences
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },
    ),
).depends_on(
    converter
)  # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter)  # Add to the DAG

Définir le nœud d'intégration

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(
    document_splitter
)  # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder)  # Add to the DAG

Définir le nœud Milvus Vector Store

vector_store = (
    MilvusDocumentWriter(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        create_if_not_exist=True,
        metric_type="COSINE",
    )
    .inputs(documents=embedder.outputs.documents)  # Connect to embedder output
    .depends_on(embedder)  # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store)  # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection

Milvus propose deux types de déploiement, répondant à des cas d'utilisation différents :

  1. MilvusDeploymentType.FILE
  • Idéal pour le prototypage local ou le stockage de données à petite échelle.
  • Définissez uri sur un chemin de fichier local (par exemple, ./milvus.db) pour tirer parti de Milvus Lite, qui stocke automatiquement toutes les données dans le fichier spécifié.
  • Il s'agit d'une option pratique pour une installation et une expérimentation rapides.
  1. MilvusDeploymentType.HOST
  • Conçu pour les scénarios de données à grande échelle, tels que la gestion de plus d'un million de vecteurs.

    Serveur auto-hébergé

    • Déployez un serveur Milvus haute performance à l'aide de Docker ou de Kubernetes.
    • Configurez l'adresse et le port du serveur en tant que uri (par exemple, http://localhost:19530).
    • Si l'authentification est activée :
    • Fournir <your_username>:<your_password> comme token.
    • Si l'authentification est désactivée :
    • Ne pas configurer token.

    Zilliz Cloud (service géré)

Définir les données d'entrée et exécuter le flux de travail

file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
    "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
    "metadata": [{"filename": path} for path in file_paths],
}

# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
  BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
  warnings.warn(  # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
  warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.

Grâce à ce flux de travail, nous avons réussi à mettre en œuvre un pipeline d'indexation de documents en utilisant Milvus comme base de données vectorielle et le modèle d'intégration d'OpenAI pour la représentation sémantique. Cette configuration permet une recherche vectorielle rapide et précise, constituant la base des flux de travail RAG tels que la recherche sémantique, la recherche de documents et les interactions contextuelles pilotées par l'IA.

Grâce aux capacités de stockage évolutives de Milvus et à l'orchestration de Dynamiq, cette solution est prête à la fois pour le prototypage et les déploiements de production à grande échelle. Vous pouvez maintenant étendre ce pipeline pour inclure des tâches supplémentaires telles que la réponse à des questions basées sur l'extraction ou la génération de contenu pilotée par l'IA.

Flux de recherche de documents RAG

Dans ce tutoriel, nous mettons en œuvre un flux de recherche de documents RAG (Retrieval-Augmented Generation). Ce flux de travail prend une requête utilisateur, génère une intégration vectorielle pour celle-ci, récupère les documents les plus pertinents à partir d'une base de données vectorielle Milvus, et utilise un grand modèle de langage (LLM) pour générer une réponse détaillée et contextuelle basée sur les documents récupérés.

En suivant ce flux de travail, vous créerez une solution de bout en bout pour la recherche sémantique et la réponse aux questions, en combinant la puissance de la recherche de documents basée sur les vecteurs avec les capacités des LLM avancés d'OpenAI. Cette approche permet des réponses efficaces et intelligentes aux requêtes des utilisateurs en exploitant les connaissances stockées dans votre base de données de documents.

Importer les bibliothèques requises et initialiser le flux de travail

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the workflow
retrieval_wf = Workflow()

Définir la connexion OpenAI et l'intégrateur de texte

# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])

# Define the text embedder node
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)

# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)

Définir le récupérateur de documents Milvus

document_retriever = (
    MilvusDocumentRetriever(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)  # Connect to embedder output
    .depends_on(embedder)  # Dependency on the embedder node
)

# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.

Définir le modèle d'invite

# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

Définir le générateur de réponses

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on(
        [document_retriever, embedder]
    )  # Dependencies on retriever and embedder
)

# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)

Exécuter le flux de travail

# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"

result = retrieval_wf.run(input_data={"query": sample_query})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.


The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.

Traduit parDeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Cette page a-t - elle été utile ?