milvus-logo
LFAI
홈페이지
  • 통합
    • 데이터 소스

Milvus와 Unstructured로 RAG 구축하기

Open In Colab GitHub Repository

Unstructured는 검색 증강 생성(RAG)과 모델 미세 조정을 위해 비정형 문서를 수집 및 처리할 수 있는 플랫폼과 도구를 제공합니다. 코드가 필요 없는 UI 플랫폼과 서버리스 API 서비스를 모두 제공하여 사용자가 Unstructured가 호스팅하는 컴퓨팅 리소스에서 데이터를 처리할 수 있습니다.

이 튜토리얼에서는 Unstructured를 사용하여 PDF 문서를 수집한 다음 Milvus를 사용하여 RAG 파이프라인을 구축합니다.

준비 사항

종속성 및 환경

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Google Colab을 사용하는 경우 방금 설치한 종속 요소를 사용하려면 런타임을 다시 시작해야 할 수 있습니다(화면 상단의 "런타임" 메뉴를 클릭하고 드롭다운 메뉴에서 "세션 다시 시작"을 선택).

여기에서 UNSTRUCTURED_API_KEYUNSTRUCTURED_URL 환경 변수를 가져올 수 있습니다.

이 예제에서는 OpenAI를 LLM으로 사용하겠습니다. 환경 변수로 OPENAI_API_KEY API 키를 준비해야 합니다.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Milvus 및 OpenAI 클라이언트 준비

Milvus 클라이언트를 사용하여 Milvus 컬렉션을 생성하고 데이터를 삽입할 수 있습니다.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

인수는 MilvusClient 입니다:

  • uri 을 로컬 파일(예:./milvus.db)로 설정하는 것이 가장 편리한 방법이며, 이 파일에 모든 데이터를 저장하기 위해 Milvus Lite를 자동으로 활용하기 때문입니다.
  • 백만 개 이상의 벡터와 같이 대량의 데이터가 있는 경우, Docker 또는 Kubernetes에 더 성능이 좋은 Milvus 서버를 설정할 수 있습니다. 이 설정에서는 서버 주소와 포트를 URI로 사용하세요(예:http://localhost:19530). Milvus에서 인증 기능을 활성화하는 경우 토큰으로 "<사용자 이름>:<사용자 비밀번호>"를 사용하고, 그렇지 않은 경우 토큰을 설정하지 마세요.
  • 밀버스의 완전 관리형 클라우드 서비스인 질리즈 클라우드를 사용하려면 질리즈 클라우드의 퍼블릭 엔드포인트와 API 키에 해당하는 uritoken 을 조정하세요.

컬렉션이 이미 존재하는지 확인하고 존재한다면 삭제합니다.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

임베딩을 생성하고 응답을 생성할 OpenAI 클라이언트를 준비합니다.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

테스트 임베딩을 생성하고 해당 차원과 처음 몇 개의 요소를 인쇄합니다.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Milvus 컬렉션 만들기

다음 스키마로 컬렉션을 생성합니다:

  • id각 문서의 고유 식별자인 기본 키.
  • vector문서의 임베딩.
  • text문서의 텍스트 콘텐츠.
  • metadata문서의 메타데이터.

그런 다음 vector 필드에 AUTOINDEX 인덱스를 구축합니다. 그런 다음 컬렉션을 만듭니다.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

비정형에서 데이터 로드

비정형은 PDF, HTML 등 다양한 파일 유형을 처리할 수 있는 유연하고 강력한 수집 파이프라인을 제공합니다. 수집 기능을 사용하여 로컬 디렉터리에서 PDF 파일을 분할하겠습니다. 그런 다음 Milvus에 데이터를 로드합니다.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Milvus에 데이터를 삽입합니다.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

검색 및 응답 생성

Milvus에서 관련 문서를 검색하는 함수를 정의합니다.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

RAG 파이프라인에서 검색된 문서를 사용하여 응답을 생성하는 함수를 정의합니다.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

샘플 질문으로 RAG 파이프라인을 테스트해 보겠습니다.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

번역DeepL

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
피드백

이 페이지가 도움이 되었나요?