milvus-logo
LFAI
Home
  • Intégrations
    • Sources de données

Construire un RAG avec Milvus et Unstructured

Open In Colab GitHub Repository

Unstructured fournit une plateforme et des outils pour ingérer et traiter des documents non structurés pour la génération augmentée par récupération (RAG) et l'affinement du modèle. Il offre à la fois une plateforme d'interface utilisateur sans code et des services API sans serveur, permettant aux utilisateurs de traiter les données sur les ressources de calcul hébergées par Unstructured.

Dans ce tutoriel, nous utiliserons Unstructured pour ingérer des documents PDF, puis Milvus pour construire un pipeline RAG.

Préparation

Dépendances et environnement

$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai

Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).

Vous pouvez obtenir vos variables d'environnement UNSTRUCTURED_API_KEY et UNSTRUCTURED_URL ici.

Nous utiliserons OpenAI comme LLM dans cet exemple. Vous devez préparer la clé api OPENAI_API_KEY en tant que variable d'environnement.

import os


os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"

os.environ["OPENAI_API_KEY"] = "***********"

Préparer les clients Milvus et OpenAI

Vous pouvez utiliser le client Milvus pour créer une collection Milvus et y insérer des données.

from pymilvus import MilvusClient, DataType

# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")  # TODO

Comme pour l'argument de MilvusClient:

  • Définir uri comme un fichier local, par exemple./milvus.db, est la méthode la plus pratique, car elle utilise automatiquement Milvus Lite pour stocker toutes les données dans ce fichier.
  • Si vous avez des données à grande échelle, par exemple plus d'un million de vecteurs, vous pouvez configurer un serveur Milvus plus performant sur Docker ou Kubernetes. Dans cette configuration, veuillez utiliser l'adresse et le port du serveur comme uri, par exemplehttp://localhost:19530. Si vous activez la fonction d'authentification sur Milvus, utilisez "<votre_nom_d'utilisateur>:<votre_mot_de_passe>" comme jeton, sinon ne définissez pas le jeton.
  • Si vous souhaitez utiliser Zilliz Cloud, le service en nuage entièrement géré pour Milvus, réglez les paramètres uri et token, qui correspondent au point de terminaison public et à la clé Api dans Zilliz Cloud.

Vérifier si la collection existe déjà et la supprimer si c'est le cas.

collection_name = "my_rag_collection"

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Préparez un client OpenAI pour générer des embeddings et des réponses.

from openai import OpenAI

openai_client = OpenAI()


def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Générer un embedding de test et imprimer sa dimension et ses premiers éléments.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Création de la collection Milvus

Nous allons créer une collection avec le schéma suivant :

  • idla clé primaire, qui est un identifiant unique pour chaque document.
  • vectorLe contenu du document : l'intégration du document.
  • text: le contenu textuel du document.
  • metadatales métadonnées du document.

Ensuite, nous construisons un index AUTOINDEX sur le champ vector. Puis nous créons la collection.

# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="AUTOINDEX",
)
milvus_client.create_collection(
    collection_name=collection_name,
    schema=schema,
    index_params=index_params,
    consistency_level="Strong",
)

milvus_client.load_collection(collection_name=collection_name)

Charger des données à partir d'Unstructured

Unstructured fournit un pipeline d'ingestion flexible et puissant pour traiter différents types de fichiers, y compris PDF, HTML, etc. Nous utiliserons la fonctionnalité d'ingestion pour partitionner les fichiers PDF dans un répertoire local. Nous chargerons ensuite les données dans Milvus.

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"

Pipeline.from_configs(
    context=ProcessorConfig(),
    indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
    downloader_config=LocalDownloaderConfig(),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res",
        additional_partition_args={
            "split_pdf_page": True,
            "split_pdf_concurrency_level": 15,
        },
    ),
    uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()


from unstructured.staging.base import elements_from_json


def load_processed_files(directory_path):
    elements = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            try:
                elements.extend(elements_from_json(filename=file_path))
            except IOError:
                print(f"Error: Could not read file {filename}.")

    return elements


elements = load_processed_files(directory_with_results)

Insérer des données dans Milvus.

data = []
for i, element in enumerate(elements):
    data.append(
        {
            "id": i,
            "vector": emb_text(element.text),
            "text": element.text,
            "metadata": element.metadata.to_dict(),
        }
    )
milvus_client.insert(collection_name=collection_name, data=data)

Récupérer et générer une réponse

Définir une fonction pour récupérer les documents pertinents dans Milvus.

def retrieve_documents(question, top_k=3):
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[emb_text(question)],
        limit=top_k,
        # search_params={"metric_type": "IP", "params": {}},
        output_fields=["text"],
    )
    return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]

Définir une fonction pour générer une réponse à l'aide des documents récupérés dans le pipeline RAG.

def generate_rag_response(question):
    retrieved_docs = retrieve_documents(question)
    context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
    system_prompt = (
        "You are an AI assistant. Provide answers based on the given context."
    )
    user_prompt = f"""
    Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
    
    Context:
    {context}
    
    Question: {question}
    """
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content

Testons le pipeline RAG avec un exemple de question.

question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.

Traduit parDeepLogo

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Cette page a-t - elle été utile ?