Open In Colab GitHub Repository

MilvusとHaystackによる全文検索

全文検索は、テキスト中の特定のキーワードやフレーズにマッチする文書を検索する伝統的な手法です。用語の頻度などから計算された関連性スコアに基づいて結果をランク付けする。セマンティック検索が意味や文脈を理解するのに優れているのに対し、全文検索は正確なキーワードマッチングに優れており、セマンティック検索を補完するのに有用である。BM25アルゴリズムは、フルテキスト検索におけるランキングに広く使用されており、RAG(Retrieval-Augmented Generation)において重要な役割を果たしている。

Milvus 2.5では、BM25を使ったネイティブな全文検索機能を導入した。このアプローチはテキストをBM25スコアを表すスパースベクトルに変換します。生テキストを入力するだけで、Milvusは自動的にスパースベクトルを生成し、保存します。

HaystackはこのMilvusの機能をサポートし、RAGアプリケーションに全文検索を簡単に追加できるようになりました。全文検索と密なベクトル意味検索を組み合わせることで、意味理解とキーワードマッチング精度の両方から恩恵を受けるハイブリッドアプローチを実現することができます。この組み合わせは検索精度を向上させ、より良い結果をユーザーに提供します。

このチュートリアルでは、HaystackとMilvusを使用してアプリケーションにフルテキスト検索とハイブリッド検索を実装する方法を示します。

Milvusベクターストアを使用するには、MilvusサーバをURI (オプションでTOKEN )で指定します。Milvusサーバーを立ち上げるには、Milvusインストールガイドに従うか、Zilliz Cloud(フルマネージドMilvus)を無料でお試しください。

  • 全文検索は現在、Milvus Standalone、Milvus Distributed、Zilliz Cloudで利用可能です。詳細は support@zilliz.com までお問い合わせください。
  • このチュートリアルを進める前に、全文検索とHaystack Milvus統合の基本的な使い方を理解してください。

前提条件

このノートブックを実行する前に、以下の依存関係がインストールされていることを確認してください:

$ pip install --upgrade --quiet pymilvus milvus-haystack

Google Colabを使用している場合、インストールしたばかりの依存関係を有効にするには、ランタイムを再起動する必要があるかもしれません(画面上部の "Runtime "メニューをクリックし、ドロップダウンメニューから "Restart session "を選択してください)。

OpenAIのモデルを使います。api key OPENAI_API_KEY を環境変数として用意してください。

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

データの準備

このノートブックで必要なパッケージをインポートします。次に、いくつかのサンプルドキュメントを用意します。

from haystack import Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack.utils import Secret
from milvus_haystack import MilvusDocumentStore, MilvusSparseEmbeddingRetriever
from haystack.document_stores.types import DuplicatePolicy
from milvus_haystack.function import BM25BuiltInFunction
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusHybridRetriever

from haystack.utils import Secret
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack import Document

documents = [
    Document(content="Alice likes this apple", meta={"category": "fruit"}),
    Document(content="Bob likes swimming", meta={"category": "sport"}),
    Document(content="Charlie likes white dogs", meta={"category": "pets"}),
]

全文検索をRAGシステムに組み込むことで、セマンティック検索と正確で予測可能なキーワードベースの検索のバランスをとることができる。より良い検索結果を得るためには、全文検索とセマンティック検索を組み合わせることをお勧めしますが、全文検索のみを使用することもできます。ここではデモンストレーションのため、全文検索のみとハイブリッド検索を示します。

埋め込みなしのBM25検索

インデックス作成パイプラインの作成

全文検索 MilvusDocumentStore はbuiltin_function パラメータを受け付けます。このパラメータを通して、Milvusサーバ側でBM25アルゴリズムを実装しているBM25BuiltInFunction のインスタンスを渡すことができます。BM25 関数のインスタンスとして指定されたbuiltin_function を設定します。例えば

connection_args = {"uri": "http://localhost:19530"}
# connection_args = {"uri": YOUR_ZILLIZ_CLOUD_URI, "token": Secret.from_env_var("ZILLIZ_CLOUD_API_KEY")}

document_store = MilvusDocumentStore(
    connection_args=connection_args,
    sparse_vector_field="sparse_vector",  # The sparse vector field.
    text_field="text",
    builtin_function=[
        BM25BuiltInFunction(  # The BM25 function converts the text into a sparse vector.
            input_field_names="text",
            output_field_names="sparse_vector",
        )
    ],
    consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`).
    drop_old=True,  # Drop the old collection if it exists and recreate it.
)

connection_argsの場合:

  • dockerやkubernetes上でよりパフォーマンスの高いMilvusサーバーを構築することができます。このセットアップでは、サーバアドレス、例えばhttp://localhost:19530uri として使用してください。
  • MilvusのフルマネージドクラウドサービスであるZilliz Cloudを利用する場合は、Zilliz CloudのPublic EndpointとApi keyに対応するuritoken を調整してください。

Milvusドキュメントストアにドキュメントを書き込むためのインデックス作成パイプラインを構築する。

writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.run({"writer": {"documents": documents}})
{'writer': {'documents_written': 3}}

検索パイプラインの作成

document_store のラッパーであるMilvusSparseEmbeddingRetriever を使って、Milvus ドキュメントストアからドキュメントを検索する検索パイプラインを作成する。

retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component(
    "retriever", MilvusSparseEmbeddingRetriever(document_store=document_store)
)

question = "Who likes swimming?"

retrieval_results = retrieval_pipeline.run({"retriever": {"query_text": question}})

retrieval_results["retriever"]["documents"][0]
Document(id=bd334348dd2087c785e99b5a0009f33d9b8b8198736f6415df5d92602d81fd3e, content: 'Bob likes swimming', meta: {'category': 'sport'}, score: 1.2039074897766113)

インデックス作成パイプラインの作成

ハイブリッド検索では、BM25関数を用いて全文検索を行い、密なベクトル場vector を指定して意味検索を行う。

document_store = MilvusDocumentStore(
    connection_args=connection_args,
    vector_field="vector",  # The dense vector field.
    sparse_vector_field="sparse_vector",  # The sparse vector field.
    text_field="text",
    builtin_function=[
        BM25BuiltInFunction(  # The BM25 function converts the text into a sparse vector.
            input_field_names="text",
            output_field_names="sparse_vector",
        )
    ],
    consistency_level="Bounded",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`).
    drop_old=True,  # Drop the old collection and recreate it.
)

文書を埋め込みに変換するインデックス作成パイプラインを作成する。その後、文書をmilvus文書ストアに書き出す。

writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("dense_doc_embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("dense_doc_embedder", "writer")
indexing_pipeline.run({"dense_doc_embedder": {"documents": documents}})

print("Number of documents:", document_store.count_documents())
Calculating embeddings: 100%|██████████| 1/1 [00:01<00:00,  1.15s/it]


Number of documents: 3

検索パイプラインの作成

document_store を含み、ハイブリッド検索に関するパラメータを受け取る、MilvusHybridRetriever を使用して、Milvus ドキュメントストアからドキュメントを検索する検索パイプラインを作成する。

# from pymilvus import WeightedRanker
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("dense_text_embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component(
    "retriever",
    MilvusHybridRetriever(
        document_store=document_store,
        # top_k=3,
        # reranker=WeightedRanker(0.5, 0.5),  # Default is RRFRanker()
    ),
)

retrieval_pipeline.connect("dense_text_embedder.embedding", "retriever.query_embedding")
<haystack.core.pipeline.pipeline.Pipeline object at 0x3383ad990>
🚅 Components
  - dense_text_embedder: OpenAITextEmbedder
  - retriever: MilvusHybridRetriever
🛤️ Connections
  - dense_text_embedder.embedding -> retriever.query_embedding (List[float])

MilvusHybridRetriever を使ってハイブリッド検索を行う場合、オプションでtopKとrerankerのパラメータを設定できる。これは自動的にベクトル埋め込みと組み込み関数を処理し、最後にリランカーを使って結果を絞り込む。検索プロセスの基本的な実装の詳細は、ユーザーからは見えないようになっている。

ハイブリッド検索の詳細については、ハイブリッド検索入門を参照してください。

question = "Who likes swimming?"

retrieval_results = retrieval_pipeline.run(
    {
        "dense_text_embedder": {"text": question},
        "retriever": {"query_text": question},
    }
)

retrieval_results["retriever"]["documents"][0]
Document(id=bd334348dd2087c785e99b5a0009f33d9b8b8198736f6415df5d92602d81fd3e, content: 'Bob likes swimming', meta: {'category': 'sport'}, score: 0.032786883413791656, embedding: vector of size 1536)

アナライザーのカスタマイズ

アナライザーは、文章をトークンに分割し、ステミングやストップワード除去のような字句解析を実行することで、全文検索に不可欠なものです。アナライザーは通常、言語固有です。Milvusのアナライザについてはこちらのガイドをご参照ください。

Milvusは2種類のアナライザをサポートしています:ビルトイン アナライザと カスタム アナライザです。デフォルトでは、BM25BuiltInFunction 標準の内蔵アナライザが使用されます。これは、句読点を含むテキストをトークン化する最も基本的なアナライザです。

別のアナライザーを使いたい場合や、アナライザーをカスタマイズしたい場合は、BM25BuiltInFunction の初期化でanalyzer_params パラメーターを渡します。

analyzer_params_custom = {
    "tokenizer": "standard",
    "filter": [
        "lowercase",  # Built-in filter
        {"type": "length", "max": 40},  # Custom filter
        {"type": "stop", "stop_words": ["of", "to"]},  # Custom filter
    ],
}

document_store = MilvusDocumentStore(
    connection_args=connection_args,
    vector_field="vector",
    sparse_vector_field="sparse_vector",
    text_field="text",
    builtin_function=[
        BM25BuiltInFunction(
            input_field_names="text",
            output_field_names="sparse_vector",
            analyzer_params=analyzer_params_custom,  # Custom analyzer parameters.
            enable_match=True,  # Whether to enable match.
        )
    ],
    consistency_level="Bounded",
    drop_old=True,
)

# write documents to the document store
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("dense_doc_embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("dense_doc_embedder", "writer")
indexing_pipeline.run({"dense_doc_embedder": {"documents": documents}})
Calculating embeddings: 100%|██████████| 1/1 [00:00<00:00,  1.39it/s]





{'dense_doc_embedder': {'meta': {'model': 'text-embedding-ada-002-v2',
   'usage': {'prompt_tokens': 11, 'total_tokens': 11}}},
 'writer': {'documents_written': 3}}

Milvusコレクションのスキーマを見て、カスタマイズされた解析器が正しくセットアップされていることを確認することができます。

document_store.col.schema
{'auto_id': False, 'description': '', 'fields': [{'name': 'text', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535, 'enable_match': True, 'enable_analyzer': True, 'analyzer_params': {'tokenizer': 'standard', 'filter': ['lowercase', {'type': 'length', 'max': 40}, {'type': 'stop', 'stop_words': ['of', 'to']}]}}}, {'name': 'id', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}, 'is_primary': True, 'auto_id': False}, {'name': 'vector', 'description': '', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 1536}}, {'name': 'sparse_vector', 'description': '', 'type': <DataType.SPARSE_FLOAT_VECTOR: 104>, 'is_function_output': True}], 'enable_dynamic_field': True, 'functions': [{'name': 'bm25_function_7b6e15a4', 'description': '', 'type': <FunctionType.BM25: 1>, 'input_field_names': ['text'], 'output_field_names': ['sparse_vector'], 'params': {}}]}

コンセプトの詳細、例えばanalyzer,tokenizer,filter,enable_match,analyzer_params については、アナライザーのドキュメントを参照してください。

RAGパイプラインでハイブリッド検索を使う

ここまで、HaystackとMilvusにおける基本的なBM25ビルドイン関数の使い方を学び、ロードしたdocument_store を準備した。ハイブリッド検索を使った最適化されたRAG実装を紹介しよう。

この図は、キーワードマッチングのためのBM25と意味検索のための密なベクトル検索を組み合わせたハイブリッド検索と再ランク付けのプロセスを示している。両方の方法からの結果はマージされ、再ランク付けされ、最終的な答えを生成するためにLLMに渡される。

ハイブリッド検索は精度と意味理解のバランスをとり、多様なクエリに対する精度と頑健性を向上させる。BM25全文検索とベクトル検索で候補を検索し、セマンティックでコンテキストを考慮した正確な検索を実現します。

ハイブリッド検索で最適化されたRAGの実装を試してみよう。

prompt_template = """Answer the following query based on the provided context. If the context does
                     not include an answer, reply with 'I don't know'.\n
                     Query: {{query}}
                     Documents:
                     {% for doc in documents %}
                        {{ doc.content }}
                     {% endfor %}
                     Answer:
                  """

rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", OpenAITextEmbedder())
rag_pipeline.add_component(
    "retriever", MilvusHybridRetriever(document_store=document_store, top_k=1)
)
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component(
    "generator",
    OpenAIGenerator(
        api_key=Secret.from_token(os.getenv("OPENAI_API_KEY")),
        generation_kwargs={"temperature": 0},
    ),
)
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")

results = rag_pipeline.run(
    {
        "text_embedder": {"text": question},
        "retriever": {"query_text": question},
        "prompt_builder": {"query": question},
    }
)
print("RAG answer:", results["generator"]["replies"][0])
RAG answer: Bob likes swimming.

milvus-haystackの使い方については、milvus-haystackの公式リポジトリを参照してください。