Построение RAG с помощью Milvus и Unstructured
Unstructured предоставляет платформу и инструменты для получения и обработки неструктурированных документов для создания расширенного поиска (RAG) и точной настройки модели. Она предлагает как платформу с пользовательским интерфейсом без кода, так и бессерверные API-сервисы, позволяющие пользователям обрабатывать данные на вычислительных ресурсах, размещенных на Unstructured.
В этом руководстве мы будем использовать Unstructured для получения PDF-документов, а затем с помощью Milvus построим конвейер RAG.
Подготовка
Зависимости и среда
$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai
Параметры установки:
- Для обработки всех форматов документов:
pip install "unstructured[all-docs]" - Для обработки определенных форматов (например, PDF):
pip install "unstructured[pdf]" - Дополнительные варианты установки см. в документации по Unstructured.
Если вы используете Google Colab, для включения только что установленных зависимостей вам может потребоваться перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).
В этом примере мы будем использовать OpenAI в качестве LLM. Вам следует подготовить api ключ OPENAI_API_KEY в качестве переменной окружения.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Подготовка клиентов Milvus и OpenAI
Вы можете использовать клиент Milvus для создания коллекции Milvus и вставки в нее данных.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")
Что касается аргумента MilvusClient:
- Установка
uriв качестве локального файла, например./milvus.db, является наиболее удобным методом, так как он автоматически использует Milvus Lite для хранения всех данных в этом файле. - Если у вас большой объем данных, скажем, более миллиона векторов, вы можете настроить более производительный сервер Milvus на Docker или Kubernetes. В этом случае используйте адрес и порт сервера в качестве uri, например,
http://localhost:19530. Если вы включите функцию аутентификации на Milvus, используйте ": " в качестве токена, в противном случае не задавайте токен. - Если вы хотите использовать Zilliz Cloud, полностью управляемый облачный сервис для Milvus, настройте
uriиtoken, которые соответствуют публичной конечной точке и ключу Api в Zilliz Cloud.
Проверьте, не существует ли уже коллекция, и удалите ее, если она существует.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Подготовьте клиент OpenAI для генерации вкраплений и ответов.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Сгенерируйте тестовый эмбеддинг и выведите его размерность и первые несколько элементов.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Создание коллекции Milvus
Мы создадим коллекцию со следующей схемой:
id: первичный ключ, который является уникальным идентификатором для каждого документа.vector: вложенность документа.text: текстовое содержимое документа.metadata: метаданные документа.
Затем мы создаем индекс AUTOINDEX по полю vector. А затем создаем коллекцию.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Bounded",
)
milvus_client.load_collection(collection_name=collection_name)
Загрузка данных из Unstructured
Unstructured предоставляет гибкий и мощный конвейер ввода данных для обработки различных типов файлов, включая PDF, HTML и т. д. Мы разделим и раздробим локальный PDF-файл. А затем загрузим данные в Milvus.
import warnings
from unstructured.partition.auto import partition
warnings.filterwarnings("ignore")
elements = partition(
filename="./pdf_files/WhatisMilvus.pdf",
strategy="hi_res",
chunking_strategy="by_title",
) # Replace with the path to your PDF file
Давайте рассмотрим разделенные элементы из PDF-файла. Каждый элемент представляет собой фрагмент содержимого, извлеченный в процессе разбиения Unstructured.
for element in elements:
print(element)
break
What is Milvus?
Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.
Вставка данных в Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}
Получение и генерация ответа
Определите функцию для получения соответствующих документов из Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Определите функцию для генерации ответа с использованием полученных документов в конвейере RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Давайте протестируем конвейер RAG на примере вопроса.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.