🚀 Попробуйте Zilliz Cloud, полностью управляемый Milvus, бесплатно — ощутите 10-кратное увеличение производительности! Попробовать сейчас>

milvus-logo
LFAI
Главная
  • Интеграции
  • Home
  • Docs
  • Интеграции

  • Источники данных

  • Неструктурированные

Построение RAG с помощью Milvus и Unstructured

Open In Colab GitHub Repository

Unstructured предоставляет платформу и инструменты для получения и обработки неструктурированных документов для создания расширенного поиска (RAG) и точной настройки модели. Она предлагает как платформу с пользовательским интерфейсом без кода, так и бессерверные API-сервисы, позволяющие пользователям обрабатывать данные на вычислительных ресурсах, размещенных на Unstructured.

В этом руководстве мы будем использовать Unstructured для получения PDF-документов, а затем с помощью Milvus построим конвейер RAG.

Подготовка

Зависимости и среда

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Если вы используете Google Colab, то для включения только что установленных зависимостей вам может потребоваться перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).

Переменные окружения UNSTRUCTURED_API_KEY и UNSTRUCTURED_URL можно получить здесь.

В этом примере мы будем использовать OpenAI в качестве LLM. Вы должны подготовить api ключ OPENAI_API_KEY в качестве переменной окружения.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Подготовьте клиенты Milvus и OpenAI

Вы можете использовать клиент Milvus для создания коллекции Milvus и вставки в нее данных.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

Что касается аргумента MilvusClient:

  • Установка uri в качестве локального файла, например./milvus.db, является наиболее удобным методом, так как он автоматически использует Milvus Lite для хранения всех данных в этом файле.
  • Если у вас большой объем данных, скажем, более миллиона векторов, вы можете настроить более производительный сервер Milvus на Docker или Kubernetes. В этом случае используйте адрес и порт сервера в качестве uri, например,http://localhost:19530. Если вы включили функцию аутентификации на Milvus, используйте "<ваше_имя_пользователя>:<ваш_пароль>" в качестве токена, в противном случае не задавайте токен.
  • Если вы хотите использовать Zilliz Cloud, полностью управляемый облачный сервис для Milvus, настройте uri и token, которые соответствуют публичной конечной точке и ключу Api в Zilliz Cloud.

Проверьте, не существует ли уже коллекция, и удалите ее, если она существует.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Подготовьте клиент OpenAI для генерации вкраплений и ответов.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Сгенерируйте тестовый эмбеддинг и выведите его размерность и первые несколько элементов.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Создание коллекции Milvus

Мы создадим коллекцию со следующей схемой:

  • id: первичный ключ, который является уникальным идентификатором для каждого документа.
  • vector: вложенность документа.
  • text: текстовое содержимое документа.
  • metadata: метаданные документа.

Затем мы создаем индекс AUTOINDEX по полю vector. А затем создаем коллекцию.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

Загрузка данных из Unstructured

Unstructured предоставляет гибкий и мощный конвейер для обработки различных типов файлов, включая PDF, HTML и т. д. Мы воспользуемся функцией ingest для разделения PDF-файлов в локальной директории. А затем загрузим данные в Milvus.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Вставка данных в Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

Получение и генерация ответа

Определите функцию для получения соответствующих документов из Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Определите функцию для генерации ответа с использованием полученных документов в конвейере RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Давайте протестируем конвейер RAG на примере вопроса.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Попробуйте Managed Milvus бесплатно

Zilliz Cloud работает без проблем, поддерживается Milvus и в 10 раз быстрее.

Начать
Обратная связь

Была ли эта страница полезной?