Membangun RAG dengan Milvus dan Unstructured
Unstructured menyediakan platform dan alat untuk menelan dan memproses dokumen tak terstruktur untuk Retrieval Augmented Generation (RAG) dan penyempurnaan model. Platform ini menawarkan platform UI tanpa kode dan layanan API tanpa server, yang memungkinkan pengguna untuk memproses data pada sumber daya komputasi yang dihosting oleh Unstructured.
Dalam tutorial ini, kita akan menggunakan Unstructured untuk mengambil dokumen PDF dan kemudian menggunakan Milvus untuk membangun pipeline RAG.
Persiapan
Ketergantungan dan Lingkungan
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Jika Anda menggunakan Google Colab, untuk mengaktifkan dependensi yang baru saja terinstal, Anda mungkin perlu memulai ulang runtime (klik menu "Runtime" di bagian atas layar, dan pilih "Restart session" dari menu tarik-turun).
Anda dapat memperoleh variabel lingkungan UNSTRUCTURED_API_KEY
dan UNSTRUCTURED_URL
dari sini.
Kita akan menggunakan OpenAI sebagai LLM dalam contoh ini. Anda harus menyiapkan kunci api OPENAI_API_KEY
sebagai variabel lingkungan.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
Menyiapkan klien Milvus dan OpenAI
Anda dapat menggunakan klien Milvus untuk membuat koleksi Milvus dan memasukkan data ke dalamnya.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
Adapun argumen dari MilvusClient
:
- Menetapkan
uri
sebagai file lokal, misalnya./milvus.db
, adalah metode yang paling mudah, karena secara otomatis menggunakan Milvus Lite untuk menyimpan semua data dalam file ini. - Jika Anda memiliki data dalam skala besar, misalnya lebih dari satu juta vektor, Anda dapat menyiapkan server Milvus yang lebih berkinerja tinggi di Docker atau Kubernetes. Dalam pengaturan ini, gunakan alamat dan port server sebagai uri Anda, misalnya
http://localhost:19530
. Jika Anda mengaktifkan fitur autentikasi pada Milvus, gunakan "<nama_user Anda>:<kata sandi Anda>" sebagai token, jika tidak, jangan setel token. - Jika Anda ingin menggunakan Zilliz Cloud, layanan cloud yang dikelola sepenuhnya untuk Milvus, sesuaikan
uri
dantoken
, yang sesuai dengan Public Endpoint dan Api key di Zilliz Cloud.
Periksa apakah koleksi sudah ada dan hapus jika sudah ada.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Siapkan klien OpenAI untuk menghasilkan embedding dan menghasilkan respons.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Hasilkan embedding uji coba dan cetak dimensi dan beberapa elemen pertama.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Membuat Koleksi Milvus
Kita akan membuat sebuah koleksi dengan skema berikut ini:
id
: kunci utama, yang merupakan pengenal unik untuk setiap dokumen.vector
: penyematan dokumen.text
: konten teks dari dokumen.metadata
: metadata dari dokumen.
Kemudian kita membangun sebuah indeks AUTOINDEX
pada bidang vector
. Dan kemudian buatlah koleksinya.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Memuat data dari Unstructured
Unstructured menyediakan pipeline ingestion yang fleksibel dan kuat untuk memproses berbagai jenis file, termasuk PDF, HTML, dan lainnya. Kita akan menggunakan fungsionalitas ingest untuk mempartisi file PDF di direktori lokal. Dan kemudian memasukkan data ke dalam Milvus.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Memasukkan data ke dalam Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
Mengambil dan Menghasilkan Respon
Mendefinisikan sebuah fungsi untuk mengambil dokumen yang relevan dari Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Tentukan sebuah fungsi untuk menghasilkan respon menggunakan dokumen yang diambil dalam pipeline RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Mari kita uji pipeline RAG dengan sebuah contoh pertanyaan.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.