Open In Colab GitHub Repository

Introdução ao Dynamiq e ao Milvus

O Dynamiq é uma poderosa estrutura de IA de geração que simplifica o desenvolvimento de aplicativos alimentados por IA. Com suporte robusto para agentes de geração aumentada por recuperação (RAG) e modelo de linguagem grande (LLM), o Dynamiq permite que os desenvolvedores criem sistemas inteligentes e dinâmicos com facilidade e eficiência.

Neste tutorial, exploraremos como usar o Dynamiq com o Milvus, o banco de dados vetorial de alto desempenho criado especificamente para fluxos de trabalho RAG. O Milvus é excelente em armazenamento, indexação e recuperação eficientes de embeddings vetoriais, tornando-o um componente indispensável para sistemas de IA que exigem acesso rápido e preciso a dados contextuais.

Este guia passo a passo abrangerá dois fluxos de trabalho principais do RAG:

  • Fluxo de indexação de documentos: Saiba como processar ficheiros de entrada (por exemplo, PDFs), transformar o seu conteúdo em embeddings vectoriais e armazená-los no Milvus. A utilização das capacidades de indexação de alto desempenho do Milvus garante que os seus dados estão prontos para uma rápida recuperação.

  • Fluxo de recuperação de documentos: descubra como consultar o Milvus para obter embeddings de documentos relevantes e usá-los para gerar respostas perspicazes e conscientes do contexto com os agentes LLM da Dynamiq, criando uma experiência de utilizador com IA perfeita.

No final deste tutorial, obterá uma sólida compreensão de como o Milvus e o Dynamiq trabalham em conjunto para criar sistemas de IA escaláveis e sensíveis ao contexto, adaptados às suas necessidades.

Preparação

Descarregar as bibliotecas necessárias

$ pip install dynamiq pymilvus milvus-lite

Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).

Configurar o agente LLM

Neste exemplo, utilizaremos o OpenAI como LLM. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

RAG - Fluxo de indexação de documentos

Este tutorial demonstra um fluxo de trabalho RAG (Retrieval-Augmented Generation) para indexação de documentos com o Milvus como base de dados vetorial. O fluxo de trabalho recebe arquivos PDF de entrada, processa-os em pedaços menores, gera embeddings de vetor usando o modelo de embedding do OpenAI e armazena os embeddings em uma coleção do Milvus para recuperação eficiente.

No final deste fluxo de trabalho, terá um sistema de indexação de documentos escalável e eficiente que suporta futuras tarefas RAG, como pesquisa semântica e resposta a perguntas.

Importar as bibliotecas necessárias e inicializar o fluxo de trabalho

# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter

# Initialize the workflow
rag_wf = Workflow()

Definir nó do conversor de PDF

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
    converter
)  # Add node to the DAG (Directed Acyclic Graph)

Definir nó divisor de documentos

document_splitter = DocumentSplitter(
    split_by="sentence",  # Splits documents into sentences
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },
    ),
).depends_on(
    converter
)  # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter)  # Add to the DAG

Definir nó de incorporação

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(
    document_splitter
)  # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder)  # Add to the DAG

Definir nó de armazenamento de vetor Milvus

vector_store = (
    MilvusDocumentWriter(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        create_if_not_exist=True,
        metric_type="COSINE",
    )
    .inputs(documents=embedder.outputs.documents)  # Connect to embedder output
    .depends_on(embedder)  # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store)  # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection

O Milvus oferece dois tipos de implantação, que atendem a diferentes casos de uso:

  1. MilvusDeploymentType.FILE
  • Ideal para prototipagem local ou armazenamento de dados em pequena escala.
  • Defina o uri para um caminho de ficheiro local (por exemplo, ./milvus.db) para aproveitar o Milvus Lite, que armazena automaticamente todos os dados no ficheiro especificado.
  • Esta é uma opção conveniente para configuração e experimentação rápidas.
  1. MilvusDeploymentType.HOST
  • Concebido para cenários de dados em grande escala, como a gestão de mais de um milhão de vectores.

    Servidor auto-hospedado

    • Implante um servidor Milvus de alto desempenho usando Docker ou Kubernetes.
    • Configure o endereço e a porta do servidor como uri (por exemplo, http://localhost:19530).
    • Se a autenticação estiver activada:
    • Forneça <your_username>:<your_password> como o token.
    • Se a autenticação estiver desativada:
    • Deixar o token não definido.

    Zilliz Cloud (serviço gerido)

Definir dados de entrada e executar o fluxo de trabalho

file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
    "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
    "metadata": [{"filename": path} for path in file_paths],
}

# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
  BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
  warnings.warn(  # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
  warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.

Através deste fluxo de trabalho, implementámos com êxito um pipeline de indexação de documentos utilizando o Milvus como base de dados vetorial e o modelo de incorporação do OpenAI para representação semântica. Esta configuração permite uma recuperação rápida e precisa baseada em vectores, formando a base para fluxos de trabalho RAG, como a pesquisa semântica, a recuperação de documentos e as interações contextuais orientadas para a IA.

Com os recursos de armazenamento escalonável do Milvus e a orquestração do Dynamiq, essa solução está pronta para implantações de prototipagem e produção em grande escala. Pode agora alargar este pipeline para incluir tarefas adicionais, como a resposta a perguntas baseadas na recuperação ou a geração de conteúdos orientados para a IA.

Fluxo de recuperação de documentos RAG

Neste tutorial, implementamos um fluxo de trabalho de recuperação de documentos RAG (Retrieval-Augmented Generation). Este fluxo de trabalho recebe uma consulta do utilizador, gera uma incorporação de vetor para a mesma, recupera os documentos mais relevantes de uma base de dados de vectores Milvus e utiliza um modelo de linguagem de grande dimensão (LLM) para gerar uma resposta detalhada e contextualizada com base nos documentos recuperados.

Ao seguir este fluxo de trabalho, criará uma solução completa para pesquisa semântica e resposta a perguntas, combinando o poder da recuperação de documentos baseada em vectores com as capacidades dos LLMs avançados da OpenAI. Esta abordagem permite respostas eficientes e inteligentes às perguntas dos utilizadores, tirando partido do conhecimento armazenado na sua base de dados de documentos.

Importar as bibliotecas necessárias e inicializar o fluxo de trabalho

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the workflow
retrieval_wf = Workflow()

Definir a ligação OpenAI e o Text Embedder

# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])

# Define the text embedder node
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)

# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)

Definir o Milvus Document Retriever

document_retriever = (
    MilvusDocumentRetriever(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)  # Connect to embedder output
    .depends_on(embedder)  # Dependency on the embedder node
)

# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.

Definir o modelo de prompt

# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

Definir o gerador de respostas

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on(
        [document_retriever, embedder]
    )  # Dependencies on retriever and embedder
)

# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)

Executar o fluxo de trabalho

# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"

result = retrieval_wf.run(input_data={"query": sample_query})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.


The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.