Introdução ao Dynamiq e ao Milvus
O Dynamiq é uma poderosa estrutura de IA de geração que simplifica o desenvolvimento de aplicativos alimentados por IA. Com suporte robusto para agentes de geração aumentada por recuperação (RAG) e modelo de linguagem grande (LLM), o Dynamiq permite que os desenvolvedores criem sistemas inteligentes e dinâmicos com facilidade e eficiência.
Neste tutorial, exploraremos como usar o Dynamiq com o Milvus, o banco de dados vetorial de alto desempenho criado especificamente para fluxos de trabalho RAG. O Milvus é excelente em armazenamento, indexação e recuperação eficientes de embeddings vetoriais, tornando-o um componente indispensável para sistemas de IA que exigem acesso rápido e preciso a dados contextuais.
Este guia passo a passo abordará dois fluxos de trabalho principais do RAG:
Fluxo de indexação de documentos: Saiba como processar ficheiros de entrada (por exemplo, PDFs), transformar o seu conteúdo em embeddings vectoriais e armazená-los no Milvus. A utilização das capacidades de indexação de alto desempenho do Milvus garante que os seus dados estão prontos para uma rápida recuperação.
Fluxo de recuperação de documentos: descubra como consultar o Milvus para obter embeddings de documentos relevantes e usá-los para gerar respostas perspicazes e conscientes do contexto com os agentes LLM da Dynamiq, criando uma experiência de utilizador com IA perfeita.
No final deste tutorial, obterá uma sólida compreensão de como o Milvus e o Dynamiq trabalham em conjunto para criar sistemas de IA escaláveis e sensíveis ao contexto, adaptados às suas necessidades.
Preparação
Descarregar as bibliotecas necessárias
$ pip install dynamiq pymilvus
Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
Configurar o agente LLM
Neste exemplo, vamos utilizar o OpenAI como LLM. Deve preparar a chave api OPENAI_API_KEY
como uma variável de ambiente.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - Fluxo de indexação de documentos
Este tutorial demonstra um fluxo de trabalho RAG (Retrieval-Augmented Generation) para indexar documentos com o Milvus como base de dados vetorial. O fluxo de trabalho recebe arquivos PDF de entrada, processa-os em pedaços menores, gera embeddings de vetor usando o modelo de embedding do OpenAI e armazena os embeddings em uma coleção do Milvus para recuperação eficiente.
No final deste fluxo de trabalho, terá um sistema de indexação de documentos escalável e eficiente que suporta futuras tarefas RAG, como pesquisa semântica e resposta a perguntas.
Importar as bibliotecas necessárias e inicializar o fluxo de trabalho
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
Definir nó do conversor de PDF
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
Definir nó divisor de documentos
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
Definir nó de incorporação
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Definir nó de armazenamento de vetor Milvus
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
O Milvus oferece dois tipos de implantação, que atendem a diferentes casos de uso:
- MilvusDeploymentType.FILE
- Ideal para prototipagem local ou armazenamento de dados em pequena escala.
- Defina o
uri
para um caminho de ficheiro local (por exemplo,./milvus.db
) para tirar partido do Milvus Lite, que armazena automaticamente todos os dados no ficheiro especificado. - Esta é uma opção conveniente para configuração e experimentação rápidas.
- MilvusDeploymentType.HOST
Concebido para cenários de dados em grande escala, como a gestão de mais de um milhão de vectores.
Servidor auto-hospedado
- Implante um servidor Milvus de alto desempenho usando Docker ou Kubernetes.
- Configure o endereço e a porta do servidor como
uri
(por exemplo,http://localhost:19530
). - Se a autenticação estiver activada:
- Forneça
<your_username>:<your_password>
como otoken
. - Se a autenticação estiver desativada:
- Deixar o endereço
token
não definido.
Zilliz Cloud (serviço gerido)
- Para uma experiência Milvus totalmente gerida e baseada na nuvem, utilize o Zilliz Cloud.
- Defina
uri
etoken
de acordo com o Public Endpoint e a chave API fornecidos na consola do Zilliz Cloud.
Definir dados de entrada e executar o fluxo de trabalho
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
Através deste fluxo de trabalho, implementámos com êxito um pipeline de indexação de documentos utilizando o Milvus como base de dados vetorial e o modelo de incorporação do OpenAI para representação semântica. Esta configuração permite uma recuperação rápida e precisa baseada em vectores, formando a base para fluxos de trabalho RAG, como a pesquisa semântica, a recuperação de documentos e as interações contextuais orientadas para a IA.
Com os recursos de armazenamento escalonável do Milvus e a orquestração do Dynamiq, essa solução está pronta para implantações de prototipagem e produção em grande escala. Pode agora alargar este pipeline para incluir tarefas adicionais, como a resposta a perguntas baseadas na recuperação ou a geração de conteúdos orientados para a IA.
Fluxo de recuperação de documentos RAG
Neste tutorial, implementamos um fluxo de trabalho de recuperação de documentos RAG (Retrieval-Augmented Generation). Este fluxo de trabalho recebe uma consulta do utilizador, gera uma incorporação de vetor para a mesma, recupera os documentos mais relevantes de uma base de dados de vectores Milvus e utiliza um modelo de linguagem de grande dimensão (LLM) para gerar uma resposta detalhada e contextualizada com base nos documentos recuperados.
Ao seguir este fluxo de trabalho, criará uma solução completa para pesquisa semântica e resposta a perguntas, combinando o poder da recuperação de documentos baseada em vectores com as capacidades dos LLMs avançados da OpenAI. Esta abordagem permite respostas eficientes e inteligentes às perguntas dos utilizadores, tirando partido do conhecimento armazenado na sua base de dados de documentos.
Importar as bibliotecas necessárias e inicializar o fluxo de trabalho
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
Definir a ligação OpenAI e o Text Embedder
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Definir o Milvus Document Retriever
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
Definir o modelo de prompt
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
Definir o gerador de respostas
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
Executar o fluxo de trabalho
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.