🚀 Coba Zilliz Cloud, Milvus yang sepenuhnya terkelola, secara gratis—rasakan performa 10x lebih cepat! Coba Sekarang>>

milvus-logo
LFAI
Beranda
  • Integrasi
  • Home
  • Docs
  • Integrasi

  • Sumber Data

  • Tidak terstruktur

Membangun RAG dengan Milvus dan Unstructured

Open In Colab GitHub Repository

Unstructured menyediakan platform dan alat untuk menelan dan memproses dokumen tak terstruktur untuk Retrieval Augmented Generation (RAG) dan penyempurnaan model. Platform ini menawarkan platform UI tanpa kode dan layanan API tanpa server, yang memungkinkan pengguna untuk memproses data pada sumber daya komputasi yang dihosting oleh Unstructured.

Dalam tutorial ini, kita akan menggunakan Unstructured untuk mengambil dokumen PDF dan kemudian menggunakan Milvus untuk membangun pipeline RAG.

Persiapan

Ketergantungan dan Lingkungan

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja terinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).

Anda dapat memperoleh variabel lingkungan UNSTRUCTURED_API_KEY dan UNSTRUCTURED_URL dari sini.

Kita akan menggunakan OpenAI sebagai LLM dalam contoh ini. Anda harus menyiapkan kunci api OPENAI_API_KEY sebagai variabel lingkungan.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Menyiapkan klien Milvus dan OpenAI

Anda dapat menggunakan klien Milvus untuk membuat koleksi Milvus dan memasukkan data ke dalamnya.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

Adapun argumen dari MilvusClient:

  • Menetapkan uri sebagai file lokal, misalnya./milvus.db, adalah metode yang paling mudah, karena secara otomatis menggunakan Milvus Lite untuk menyimpan semua data dalam file ini.
  • Jika Anda memiliki data dalam skala besar, misalnya lebih dari satu juta vektor, Anda dapat menyiapkan server Milvus yang lebih berkinerja tinggi di Docker atau Kubernetes. Dalam pengaturan ini, gunakan alamat dan port server sebagai uri Anda, misalnyahttp://localhost:19530. Jika Anda mengaktifkan fitur autentikasi pada Milvus, gunakan "<nama_user Anda>:<kata sandi Anda>" sebagai token, jika tidak, jangan setel token.
  • Jika Anda ingin menggunakan Zilliz Cloud, layanan cloud yang dikelola sepenuhnya untuk Milvus, sesuaikan uri dan token, yang sesuai dengan Public Endpoint dan Api key di Zilliz Cloud.

Periksa apakah koleksi sudah ada dan hapus jika sudah ada.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Siapkan klien OpenAI untuk menghasilkan embedding dan menghasilkan respons.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Hasilkan embedding uji coba dan cetak dimensi dan beberapa elemen pertama.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Membuat Koleksi Milvus

Kita akan membuat sebuah koleksi dengan skema berikut ini:

  • id: kunci utama, yang merupakan pengenal unik untuk setiap dokumen.
  • vector: penyematan dokumen.
  • text: konten teks dari dokumen.
  • metadata: metadata dari dokumen.

Kemudian kita membangun sebuah indeks AUTOINDEX pada bidang vector. Dan kemudian buatlah koleksinya.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

Memuat data dari Unstructured

Unstructured menyediakan pipeline ingestion yang fleksibel dan kuat untuk memproses berbagai jenis file, termasuk PDF, HTML, dan lainnya. Kita akan menggunakan fungsionalitas ingest untuk mempartisi file PDF di direktori lokal. Dan kemudian memasukkan data ke dalam Milvus.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Memasukkan data ke dalam Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

Mengambil dan Menghasilkan Respon

Mendefinisikan sebuah fungsi untuk mengambil dokumen yang relevan dari Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Tentukan sebuah fungsi untuk menghasilkan respon menggunakan dokumen yang diambil dalam pipeline RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Mari kita uji pipeline RAG dengan sebuah contoh pertanyaan.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Coba Milvus yang Dikelola secara Gratis

Zilliz Cloud bebas masalah, didukung oleh Milvus dan 10x lebih cepat.

Mulai
Umpan balik

Apakah halaman ini bermanfaat?