🚀 免費嘗試 Zilliz Cloud,完全托管的 Milvus,體驗速度提升 10 倍!立即嘗試

milvus-logo
LFAI
主頁
  • 整合

使用 Milvus 和 Unstructured 建立 RAG

Open In Colab GitHub Repository

Unstructured提供了一個平台和工具,用於擷取和處理非結構化文檔,以進行資料檢索擴增生成 (Retrieval Augmented Generation, RAG) 和模型微調。它同時提供無程式碼 UI 平台和無伺服器 API 服務,讓使用者可以在 Unstructured 託管的計算資源上處理資料。

在本教程中,我們將使用 Unstructured 擷取 PDF 文件,然後再使用 Milvus 建立 RAG 管道。

準備工作

依賴與環境

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

如果您使用的是 Google Colab,為了啟用剛安裝的相依性,您可能需要重新啟動執行時(點選畫面上方的「Runtime」功能表,並從下拉式功能表中選擇「Restart session」)。

您可以從這裡取得UNSTRUCTURED_API_KEYUNSTRUCTURED_URL 環境變數。

在本範例中,我們將使用 OpenAI 作為 LLM。您應該準備api key OPENAI_API_KEY 作為環境變數。

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

準備 Milvus 和 OpenAI 客戶端

您可以使用 Milvus 客戶端建立一個 Milvus 套件,並將資料插入其中。

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

至於MilvusClient 的參數 :

  • uri 設定為本機檔案,例如./milvus.db ,是最方便的方法,因為它會自動利用Milvus Lite將所有資料儲存到這個檔案中。
  • 如果您有大規模的資料,例如超過一百萬個向量,您可以在Docker 或 Kubernetes 上架設效能更高的 Milvus 伺服器。在此設定中,請使用伺服器位址和連接埠作為您的 uri,例如http://localhost:19530 。如果您啟用 Milvus 上的驗證功能,請使用「<your_username>:<your_password>」作為令牌,否則請勿設定令牌。
  • 如果您要使用Zilliz Cloud(Milvus 的完全管理雲端服務),請調整uritoken ,這兩個項目對應 Zilliz Cloud 中的Public Endpoint 和 Api key

檢查資料集是否已存在,若已存在,請將其刪除。

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

準備 OpenAI 用戶端以產生嵌入並產生回應。

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

產生測試嵌入,並列印其尺寸和前幾個元素。

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

建立 Milvus 套件

我們將以下列模式建立一個集合:

  • id:主鍵,這是每個文件的唯一識別碼。
  • vector:文件的嵌入。
  • text:文件的文字內容。
  • metadata:文件的元資料。

然後,我們在vector 欄位上建立AUTOINDEX 索引。然後建立集合。

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

從 Unstructured 載入資料

Unstructured 提供了靈活而強大的攝取管道來處理各種檔案類型,包括 PDF、HTML 等等。 我們將使用攝取功能來分割本機目錄中的 PDF 檔案。然後將資料載入 Milvus。

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

將資料插入 Milvus。

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

擷取並產生回應

定義一個函式從 Milvus 擷取相關文件。

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

定義一個函式,使用 RAG 管道中擷取的文件產生回應。

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

讓我們用範例問題來測試 RAG 管道。

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

免費嘗試托管的 Milvus

Zilliz Cloud 無縫接入,由 Milvus 提供動力,速度提升 10 倍。

開始使用
反饋

這個頁面有幫助嗎?