Open In Colab GitHub Repository

DynamiqとMilvusをはじめよう

Dynamiqは、AIを搭載したアプリケーションの開発を効率化する強力なGen AIフレームワークです。検索拡張世代(RAG)と大規模言語モデル(LLM)エージェントを強力にサポートするDynamiqは、開発者が簡単かつ効率的にインテリジェントで動的なシステムを作成できるようにします。

このチュートリアルでは、RAGワークフローのために作られた高性能ベクトルデータベースであるMilvusとDynamiqをシームレスに使用する方法を探ります。Milvusは、ベクトル埋め込みデータの効率的な保存、インデックス付け、検索に優れており、高速かつ正確なコンテキストデータへのアクセスを必要とするAIシステムに不可欠なコンポーネントとなっています。

このステップバイステップガイドでは、2つのコアなRAGワークフローをカバーします:

  • ドキュメントインデキシングフロー:入力ファイル(PDFなど)を処理し、その内容をベクトル埋め込みデータに変換し、Milvusに格納する方法を学びます。Milvusの高性能なインデックス作成機能を活用することで、データを迅速に検索できる状態にします。

  • ドキュメント検索フロー: Milvusに関連するドキュメントの埋め込みをクエリし、DynamiqのLLMエージェントでコンテキストを考慮した洞察に満ちたレスポンスを生成するためにそれらを使用し、AIを活用したシームレスなユーザーエクスペリエンスを実現する方法をご覧ください。

このチュートリアルが終わる頃には、MilvusとDynamiqがどのように連携し、ニーズに合わせたスケーラブルでコンテキストを考慮したAIシステムを構築するのかについて、しっかりと理解できるようになるでしょう。

準備

必要なライブラリのダウンロード

$ pip install dynamiq pymilvus milvus-lite

Google Colabを使用している場合、インストールした依存関係を有効にするために、ランタイムを再起動する必要があります(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。

LLMエージェントの設定

この例では、LLMとしてOpenAIを使います。api key OPENAI_API_KEY を環境変数として用意してください。

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

RAG - ドキュメントのインデックス作成フロー

このチュートリアルでは、Milvusをベクターデータベースとして文書をインデックス化するRAG(Retrieval-Augmented Generation)ワークフローを示します。このワークフローは、入力されたPDFファイルを受け取り、より小さな塊に処理し、OpenAIの埋め込みモデルを使用してベクトル埋め込みを生成し、効率的な検索のために埋め込みをMilvusコレクションに保存します。

このワークフローが終了する頃には、セマンティック検索や質問応答のような将来のRAGタスクをサポートするスケーラブルで効率的なドキュメントインデキシングシステムを手に入れることができます。

必要なライブラリのインポートとワークフローの初期化

# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter

# Initialize the workflow
rag_wf = Workflow()

PDFコンバータノードの定義

converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
    converter
)  # Add node to the DAG (Directed Acyclic Graph)

ドキュメント分割ノードの定義

document_splitter = DocumentSplitter(
    split_by="sentence",  # Splits documents into sentences
    split_length=10,
    split_overlap=1,
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[converter.id]}.output.documents",
        },
    ),
).depends_on(
    converter
)  # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter)  # Add to the DAG

埋め込みノードの定義

embedder = OpenAIDocumentEmbedder(
    connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
    input_transformer=InputTransformer(
        selector={
            "documents": f"${[document_splitter.id]}.output.documents",
        },
    ),
).depends_on(
    document_splitter
)  # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder)  # Add to the DAG

Milvusベクターストアノードの定義

vector_store = (
    MilvusDocumentWriter(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        create_if_not_exist=True,
        metric_type="COSINE",
    )
    .inputs(documents=embedder.outputs.documents)  # Connect to embedder output
    .depends_on(embedder)  # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store)  # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection

Milvusは2つのデプロイメントタイプを提供しています:

  1. MilvusDeploymentType.FILE
  • ローカルのプロトタイピングや 小規模なデータ保存に最適です。
  • uri 、ローカルのファイルパス(例:./milvus.db)を設定することで、Milvus Liteが自動的に指定されたファイルにすべてのデータを保存します。
  • これは迅速なセットアップと 実験に便利なオプションです。
  1. MilvusDeploymentType.HOST
  • 100万以上のベクターを管理するような大規模データシナリオ用に設計されています。

    セルフホストサーバ

    • DockerまたはKubernetesを使用して高性能なMilvusサーバーをデプロイします。
    • サーバのアドレスとポートをuri (例:http://localhost:19530) に設定します。
    • 認証が有効な場合:
    • token<your_username>:<your_password> を指定する。
    • 認証が無効の場合:
    • token は未設定のままにします。

    Zillizクラウド(マネージドサービス)

入力データの定義とワークフローの実行

file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
    "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
    "metadata": [{"filename": path} for path in file_paths],
}

# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
  BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
  warnings.warn(  # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
  warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.

このワークフローを通して、Milvusをベクトルデータベースとして使用し、OpenAIの埋め込みモデルをセマンティック表現に使用したドキュメントインデキシングパイプラインの実装に成功した。このセットアップにより、高速かつ正確なベクトルベースの検索が可能になり、意味検索、文書検索、コンテキストAI主導型インタラクションのようなRAGワークフローの基盤が形成される。

Milvusのスケーラブルなストレージ機能とDynamiqのオーケストレーションにより、このソリューションはプロトタイピングと大規模な本番導入の両方に対応しています。このパイプラインを拡張して、検索ベースの質問応答やAI主導のコンテンツ生成などのタスクを追加することができます。

RAGドキュメント検索フロー

このチュートリアルでは、RAG(Retrieval-Augmented Generation)文書検索ワークフローを実装します。このワークフローでは、ユーザークエリを受け取り、それに対するベクトル埋め込みを生成し、Milvusベクトルデータベースから最も関連性の高い文書を検索し、大規模言語モデル(LLM)を使用して、検索された文書に基づいて詳細かつコンテキストを考慮した回答を生成します。

このワークフローに従うことで、ベクトルベースの文書検索のパワーとOpenAIの高度なLLMの機能を組み合わせた、意味検索と質問応答のためのエンドツーエンドのソリューションを作成することができます。このアプローチは、ドキュメントデータベースに保存された知識を活用することで、ユーザーからの問い合わせに対して効率的でインテリジェントな応答を可能にします。

必要なライブラリのインポートとワークフローの初期化

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Milvus as MilvusConnection,
    MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the workflow
retrieval_wf = Workflow()

OpenAI接続とテキストエンベッダーの定義

# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])

# Define the text embedder node
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)

# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)

Milvusドキュメントレトリバーの定義

document_retriever = (
    MilvusDocumentRetriever(
        connection=MilvusConnection(
            deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
        ),
        index_name="my_milvus_collection",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)  # Connect to embedder output
    .depends_on(embedder)  # Dependency on the embedder node
)

# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.

プロンプトテンプレートの定義

# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

回答ジェネレータの定義

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on(
        [document_retriever, embedder]
    )  # Dependencies on retriever and embedder
)

# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)

ワークフローの実行

# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"

result = retrieval_wf.run(input_data={"query": sample_query})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.


The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.