Primeros pasos con Dynamiq y Milvus
Dynamiq es un potente marco de Gen AI que agiliza el desarrollo de aplicaciones basadas en IA. Gracias a su sólida compatibilidad con la generación aumentada por recuperación (RAG) y los grandes agentes de modelos de lenguaje (LLM), Dynamiq permite a los desarrolladores crear sistemas inteligentes y dinámicos con facilidad y eficacia.
En este tutorial, exploraremos cómo utilizar Dynamiq sin problemas con Milvus, la base de datos vectorial de alto rendimiento creada especialmente para flujos de trabajo RAG. Milvus destaca en el almacenamiento, indexación y recuperación eficientes de incrustaciones vectoriales, lo que lo convierte en un componente indispensable para los sistemas de IA que exigen un acceso a datos contextuales rápido y preciso.
Esta guía paso a paso cubrirá dos flujos de trabajo principales de RAG:
Flujo de indexación de documentos: Aprenda a procesar archivos de entrada (por ejemplo, PDF), transformar su contenido en incrustaciones vectoriales y almacenarlos en Milvus. Aprovechar las capacidades de indexación de alto rendimiento de Milvus garantiza que sus datos estén listos para una rápida recuperación.
Flujo de recuperación de documentos: Descubra cómo consultar Milvus para incrustaciones de documentos relevantes y utilizarlos para generar respuestas perspicaces y conscientes del contexto con los agentes LLM de Dynamiq, creando una experiencia de usuario sin fisuras impulsada por la IA.
Al final de este tutorial, obtendrá una sólida comprensión de cómo Milvus y Dynamiq trabajan juntos para construir sistemas de IA escalables y conscientes del contexto adaptados a sus necesidades.
Preparación
Descarga de las bibliotecas necesarias
$ pip install dynamiq pymilvus
Si utilizas Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla y selecciona "Reiniciar sesión" en el menú desplegable).
Configurar el agente LLM
En este ejemplo utilizaremos OpenAI como LLM. Deberás preparar la clave api OPENAI_API_KEY
como variable de entorno.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - Flujo de indexación de documentos
Este tutorial muestra un flujo de trabajo RAG (Retrieval-Augmented Generation) para indexar documentos con Milvus como base de datos vectorial. El flujo de trabajo toma archivos PDF de entrada, los procesa en trozos más pequeños, genera incrustaciones vectoriales utilizando el modelo de incrustación de OpenAI y almacena las incrustaciones en una colección Milvus para una recuperación eficiente.
Al final de este flujo de trabajo, tendrá un sistema de indexación de documentos escalable y eficiente que soporta futuras tareas RAG como la búsqueda semántica y la respuesta a preguntas.
Importe las bibliotecas necesarias e inicie el flujo de trabajo
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
Definir nodo conversor de PDF
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
Definir nodo divisor de documentos
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
Definir nodo de incrustación
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Definir nodo Milvus Vector Store
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus ofrece dos tipos de despliegue, para diferentes casos de uso:
- MilvusDeploymentType.FILE
- Ideal para prototipos locales o almacenamiento de datos a pequeña escala.
- Establezca
uri
en una ruta de archivo local (por ejemplo,./milvus.db
) para aprovechar Milvus Lite, que almacena automáticamente todos los datos en el archivo especificado. - Esta es una opción conveniente para una rápida configuración y experimentación.
- MilvusTipoDespliegue.HOST
Diseñado para escenarios de datos a gran escala, como la gestión de más de un millón de vectores.
Servidor autoalojado
- Despliegue un servidor Milvus de alto rendimiento utilizando Docker o Kubernetes.
- Configure la dirección y el puerto del servidor como
uri
(por ejemplo,http://localhost:19530
). - Si la autenticación está habilitada:
- Proporcione
<your_username>:<your_password>
como eltoken
. - Si la autenticación está deshabilitada:
- Deje
token
sin configurar.
Zilliz Cloud (servicio gestionado)
- Para una experiencia Milvus totalmente gestionada y basada en la nube, utilice Zilliz Cloud.
- Configure
uri
ytoken
de acuerdo con el punto final público y la clave de API proporcionados en la consola de Zilliz Cloud.
Defina los datos de entrada y ejecute el flujo de trabajo
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
A través de este flujo de trabajo, hemos implementado con éxito una canalización de indexación de documentos utilizando Milvus como base de datos vectorial y el modelo de incrustación de OpenAI para la representación semántica. Esta configuración permite una recuperación rápida y precisa basada en vectores, formando la base para flujos de trabajo RAG como la búsqueda semántica, la recuperación de documentos y las interacciones contextuales impulsadas por IA.
Con las capacidades de almacenamiento escalable de Milvus y la orquestación de Dynamiq, esta solución está lista tanto para prototipos como para despliegues de producción a gran escala. Ahora puede ampliar esta canalización para incluir tareas adicionales, como la respuesta a preguntas basada en la recuperación o la generación de contenidos impulsada por la IA.
Flujo de recuperación de documentos RAG
En este tutorial, implementamos un flujo de trabajo de recuperación de documentos con generación mejorada de recuperación (RAG). Este flujo de trabajo toma una consulta de usuario, genera una incrustación vectorial para ella, recupera los documentos más relevantes de una base de datos vectorial Milvus y utiliza un modelo de lenguaje amplio (LLM) para generar una respuesta detallada y consciente del contexto basada en los documentos recuperados.
Siguiendo este flujo de trabajo, creará una solución integral para la búsqueda semántica y la respuesta a preguntas, combinando la potencia de la recuperación de documentos basada en vectores con las capacidades de los LLM avanzados de OpenAI. Este enfoque permite respuestas eficientes e inteligentes a las consultas de los usuarios aprovechando el conocimiento almacenado en su base de datos de documentos.
Importe las bibliotecas necesarias e inicialice el flujo de trabajo
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
Definir la conexión OpenAI y el incrustador de texto
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Definir el recuperador de documentos Milvus
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
Definir la plantilla de solicitud
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
Definir el generador de respuestas
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
Ejecutar el flujo de trabajo
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.