milvus-logo
LFAI
Home
  • Integrações

Open In Colab GitHub Repository

Introdução ao Dynamiq e ao Milvus

O Dynamiq é uma poderosa estrutura de IA de geração que simplifica o desenvolvimento de aplicativos alimentados por IA. Com suporte robusto para agentes de geração aumentada por recuperação (RAG) e modelo de linguagem grande (LLM), o Dynamiq permite que os desenvolvedores criem sistemas inteligentes e dinâmicos com facilidade e eficiência.

Neste tutorial, exploraremos como usar o Dynamiq com o Milvus, o banco de dados vetorial de alto desempenho criado especificamente para fluxos de trabalho RAG. O Milvus é excelente em armazenamento, indexação e recuperação eficientes de embeddings vetoriais, tornando-o um componente indispensável para sistemas de IA que exigem acesso rápido e preciso a dados contextuais.

Este guia passo a passo abordará dois fluxos de trabalho principais do RAG:

  • Fluxo de indexação de documentos: Saiba como processar ficheiros de entrada (por exemplo, PDFs), transformar o seu conteúdo em embeddings vectoriais e armazená-los no Milvus. A utilização das capacidades de indexação de alto desempenho do Milvus garante que os seus dados estão prontos para uma rápida recuperação.

  • Fluxo de recuperação de documentos: descubra como consultar o Milvus para obter embeddings de documentos relevantes e usá-los para gerar respostas perspicazes e conscientes do contexto com os agentes LLM da Dynamiq, criando uma experiência de utilizador com IA perfeita.

No final deste tutorial, obterá uma sólida compreensão de como o Milvus e o Dynamiq trabalham em conjunto para criar sistemas de IA escaláveis e sensíveis ao contexto, adaptados às suas necessidades.

Preparação

Descarregar as bibliotecas necessárias

$ pip install dynamiq pymilvus

Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).

Configurar o agente LLM

Neste exemplo, vamos utilizar o OpenAI como LLM. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

RAG - Fluxo de indexação de documentos

Este tutorial demonstra um fluxo de trabalho RAG (Retrieval-Augmented Generation) para indexar documentos com o Milvus como base de dados vetorial. O fluxo de trabalho recebe arquivos PDF de entrada, processa-os em pedaços menores, gera embeddings de vetor usando o modelo de embedding do OpenAI e armazena os embeddings em uma coleção do Milvus para recuperação eficiente.

No final deste fluxo de trabalho, terá um sistema de indexação de documentos escalável e eficiente que suporta futuras tarefas RAG, como pesquisa semântica e resposta a perguntas.

Importar as bibliotecas necessárias e inicializar o fluxo de trabalho

# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter

# Initialize the workflow
rag_wf = Workflow()

Definir nó do conversor de PDF

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
    converter
)  # Add node to the DAG (Directed Acyclic Graph)

Definir nó divisor de documentos

document_splitter = DocumentSplitter(
    split_by="sentence",  # Splits documents into sentences
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },
    ),
).depends_on(
    converter
)  # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter)  # Add to the DAG

Definir nó de incorporação

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(
    document_splitter
)  # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder)  # Add to the DAG

Definir nó de armazenamento de vetor Milvus

vector_store = (
    MilvusDocumentWriter(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        create_if_not_exist=True,
        metric_type="COSINE",
    )
    .inputs(documents=embedder.outputs.documents)  # Connect to embedder output
    .depends_on(embedder)  # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store)  # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection

O Milvus oferece dois tipos de implantação, que atendem a diferentes casos de uso:

  1. MilvusDeploymentType.FILE
  • Ideal para prototipagem local ou armazenamento de dados em pequena escala.
  • Defina o uri para um caminho de ficheiro local (por exemplo, ./milvus.db) para tirar partido do Milvus Lite, que armazena automaticamente todos os dados no ficheiro especificado.
  • Esta é uma opção conveniente para configuração e experimentação rápidas.
  1. MilvusDeploymentType.HOST
  • Concebido para cenários de dados em grande escala, como a gestão de mais de um milhão de vectores.

    Servidor auto-hospedado

    • Implante um servidor Milvus de alto desempenho usando Docker ou Kubernetes.
    • Configure o endereço e a porta do servidor como uri (por exemplo, http://localhost:19530).
    • Se a autenticação estiver activada:
    • Forneça <your_username>:<your_password> como o token.
    • Se a autenticação estiver desativada:
    • Deixar o endereço token não definido.

    Zilliz Cloud (serviço gerido)

Definir dados de entrada e executar o fluxo de trabalho

file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
    "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
    "metadata": [{"filename": path} for path in file_paths],
}

# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
  BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
  warnings.warn(  # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
  warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.

Através deste fluxo de trabalho, implementámos com êxito um pipeline de indexação de documentos utilizando o Milvus como base de dados vetorial e o modelo de incorporação do OpenAI para representação semântica. Esta configuração permite uma recuperação rápida e precisa baseada em vectores, formando a base para fluxos de trabalho RAG, como a pesquisa semântica, a recuperação de documentos e as interações contextuais orientadas para a IA.

Com os recursos de armazenamento escalonável do Milvus e a orquestração do Dynamiq, essa solução está pronta para implantações de prototipagem e produção em grande escala. Pode agora alargar este pipeline para incluir tarefas adicionais, como a resposta a perguntas baseadas na recuperação ou a geração de conteúdos orientados para a IA.

Fluxo de recuperação de documentos RAG

Neste tutorial, implementamos um fluxo de trabalho de recuperação de documentos RAG (Retrieval-Augmented Generation). Este fluxo de trabalho recebe uma consulta do utilizador, gera uma incorporação de vetor para a mesma, recupera os documentos mais relevantes de uma base de dados de vectores Milvus e utiliza um modelo de linguagem de grande dimensão (LLM) para gerar uma resposta detalhada e contextualizada com base nos documentos recuperados.

Ao seguir este fluxo de trabalho, criará uma solução completa para pesquisa semântica e resposta a perguntas, combinando o poder da recuperação de documentos baseada em vectores com as capacidades dos LLMs avançados da OpenAI. Esta abordagem permite respostas eficientes e inteligentes às perguntas dos utilizadores, tirando partido do conhecimento armazenado na sua base de dados de documentos.

Importar as bibliotecas necessárias e inicializar o fluxo de trabalho

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the workflow
retrieval_wf = Workflow()

Definir a ligação OpenAI e o Text Embedder

# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])

# Define the text embedder node
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)

# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)

Definir o Milvus Document Retriever

document_retriever = (
    MilvusDocumentRetriever(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)  # Connect to embedder output
    .depends_on(embedder)  # Dependency on the embedder node
)

# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.

Definir o modelo de prompt

# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

Definir o gerador de respostas

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on(
        [document_retriever, embedder]
    )  # Dependencies on retriever and embedder
)

# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)

Executar o fluxo de trabalho

# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"

result = retrieval_wf.run(input_data={"query": sample_query})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.


The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.

Traduzido porDeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Esta página foi útil?