Начало работы с Dynamiq и Milvus
Dynamiq - это мощный фреймворк Gen AI, который упрощает разработку приложений на базе ИИ. Благодаря надежной поддержке генерации с расширенным поиском (RAG) и агентов с большой языковой моделью (LLM) Dynamiq позволяет разработчикам легко и эффективно создавать интеллектуальные динамические системы.
В этом руководстве мы рассмотрим, как использовать Dynamiq с Milvus, высокопроизводительной векторной базой данных, специально созданной для рабочих процессов RAG. Milvus отличается эффективным хранением, индексацией и поиском векторных вкраплений, что делает ее незаменимым компонентом для систем ИИ, требующих быстрого и точного контекстного доступа к данным.
В этом пошаговом руководстве мы рассмотрим два основных рабочих процесса RAG:
Поток индексирования документов: узнайте, как обрабатывать входные файлы (например, PDF), преобразовывать их содержимое в векторные вкрапления и сохранять их в Milvus. Использование высокопроизводительных возможностей индексирования Milvus гарантирует, что ваши данные будут готовы к быстрому поиску.
Поток поиска документов: Узнайте, как запрашивать у Milvus соответствующие вкрапления документов и использовать их для генерации проницательных, учитывающих контекст ответов с помощью LLM-агентов Dynamiq, создавая бесшовный пользовательский опыт, основанный на искусственном интеллекте.
К концу этого урока вы получите полное представление о том, как Milvus и Dynamiq работают вместе для создания масштабируемых, контекстно-зависимых систем ИИ, отвечающих вашим потребностям.
Подготовка
Загрузите необходимые библиотеки
$ pip install dynamiq pymilvus
Если вы используете Google Colab, для включения только что установленных зависимостей вам может потребоваться перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).
Настройка агента LLM
В этом примере мы будем использовать OpenAI в качестве LLM. Вам следует подготовить api ключ OPENAI_API_KEY
в качестве переменной окружения.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - поток индексирования документов
В этом руководстве демонстрируется рабочий процесс Retrieval-Augmented Generation (RAG) для индексирования документов с использованием Milvus в качестве векторной базы данных. Рабочий процесс принимает входные PDF-файлы, обрабатывает их на более мелкие фрагменты, генерирует векторные вкрапления, используя модель вкраплений OpenAI, и сохраняет вкрапления в коллекции Milvus для эффективного поиска.
К концу этого рабочего процесса вы получите масштабируемую и эффективную систему индексирования документов, которая будет поддерживать такие будущие задачи RAG, как семантический поиск и ответы на вопросы.
Импорт необходимых библиотек и инициализация рабочего процесса
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
Определите узел PDF-конвертера
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
Определите узел разделителя документов
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
Определите узел встраивания
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Определите узел хранения векторов Milvus
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus предлагает два типа развертывания, предназначенных для различных случаев использования:
- MilvusDeploymentType.FILE
- Идеально подходит для локального прототипирования или хранения небольших объемов данных.
- Установите
uri
на путь к локальному файлу (например,./milvus.db
), чтобы использовать Milvus Lite, который автоматически сохраняет все данные в указанном файле. - Это удобный вариант для быстрой настройки и экспериментов.
- MilvusDeploymentType.HOST
Предназначен для сценариев с большими объемами данных, например, для управления более чем миллионом векторов.
Самостоятельно размещаемый сервер
- Разверните высокопроизводительный сервер Milvus с помощью Docker или Kubernetes.
- Настройте адрес и порт сервера как
uri
(например,http://localhost:19530
). - Если включена аутентификация:
- Укажите
<your_username>:<your_password>
в качествеtoken
. - Если аутентификация отключена:
- Оставьте значение
token
без изменений.
Zilliz Cloud (управляемая услуга)
- Для полностью управляемого облачного сервиса Milvus используйте Zilliz Cloud.
- Установите
uri
иtoken
в соответствии с публичной конечной точкой и ключом API, указанными в консоли Zilliz Cloud.
Определение входных данных и запуск рабочего процесса
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
С помощью этого рабочего процесса мы успешно реализовали конвейер индексации документов, используя Milvus в качестве векторной базы данных и модель встраивания OpenAI для семантического представления. Эта система обеспечивает быстрый и точный векторный поиск, формируя основу для таких рабочих процессов RAG, как семантический поиск, поиск документов и контекстное взаимодействие, управляемое ИИ.
Благодаря масштабируемым возможностям хранения Milvus и оркестровке Dynamiq это решение готово как для прототипирования, так и для крупномасштабных производственных развертываний. Теперь вы можете расширить этот конвейер, включив в него дополнительные задачи, такие как ответы на вопросы на основе поиска или генерация контента на основе ИИ.
Поток извлечения документов RAG
В этом учебном пособии мы реализуем рабочий процесс поиска документов с дополненной генерацией (Retrieval-Augmented Generation, RAG). Этот рабочий процесс принимает запрос пользователя, генерирует для него векторное вложение, извлекает наиболее релевантные документы из векторной базы данных Milvus и использует большую языковую модель (LLM) для создания подробного и учитывающего контекст ответа на основе извлеченных документов.
Следуя этому процессу, вы создадите комплексное решение для семантического поиска и ответов на вопросы, сочетающее в себе мощь векторного поиска документов и возможности передовых LLM OpenAI. Такой подход позволяет эффективно и интеллектуально отвечать на запросы пользователей, используя знания, хранящиеся в вашей базе данных документов.
Импорт необходимых библиотек и инициализация рабочего процесса
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
Определите подключение OpenAI и устройство встраивания текста
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Определите ретривер документов Milvus
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
Определите шаблон подсказки
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
Определите генератор ответов
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
Запустите рабочий процесс
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.