Construire un RAG avec Milvus et Unstructured
Unstructured fournit une plateforme et des outils pour ingérer et traiter des documents non structurés pour la génération augmentée par récupération (RAG) et l'affinement du modèle. Il offre à la fois une plateforme d'interface utilisateur sans code et des services API sans serveur, permettant aux utilisateurs de traiter les données sur les ressources de calcul hébergées par Unstructured.
Dans ce tutoriel, nous utiliserons Unstructured pour ingérer des documents PDF, puis Milvus pour construire un pipeline RAG.
Préparation
Dépendances et environnement
$ pip install -qU "unstructured-ingest[pdf]" unstructured pymilvus openai
Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).
Vous pouvez obtenir vos variables d'environnement UNSTRUCTURED_API_KEY
et UNSTRUCTURED_URL
ici.
Nous utiliserons OpenAI comme LLM dans cet exemple. Vous devez préparer la clé api OPENAI_API_KEY
en tant que variable d'environnement.
import os
os.environ["UNSTRUCTURED_API_KEY"] = "***********"
os.environ["UNSTRUCTURED_URL"] = "***********"
os.environ["OPENAI_API_KEY"] = "***********"
Préparer les clients Milvus et OpenAI
Vous pouvez utiliser le client Milvus pour créer une collection Milvus et y insérer des données.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db") # TODO
Comme pour l'argument de MilvusClient
:
- Définir
uri
comme un fichier local, par exemple./milvus.db
, est la méthode la plus pratique, car elle utilise automatiquement Milvus Lite pour stocker toutes les données dans ce fichier. - Si vous avez des données à grande échelle, par exemple plus d'un million de vecteurs, vous pouvez configurer un serveur Milvus plus performant sur Docker ou Kubernetes. Dans cette configuration, veuillez utiliser l'adresse et le port du serveur comme uri, par exemple
http://localhost:19530
. Si vous activez la fonction d'authentification sur Milvus, utilisez "<votre_nom_d'utilisateur>:<votre_mot_de_passe>" comme jeton, sinon ne définissez pas le jeton. - Si vous souhaitez utiliser Zilliz Cloud, le service en nuage entièrement géré pour Milvus, réglez les paramètres
uri
ettoken
, qui correspondent au point de terminaison public et à la clé Api dans Zilliz Cloud.
Vérifier si la collection existe déjà et la supprimer si c'est le cas.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Préparez un client OpenAI pour générer des embeddings et des réponses.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Générer un embedding de test et imprimer sa dimension et ses premiers éléments.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Création de la collection Milvus
Nous allons créer une collection avec le schéma suivant :
id
la clé primaire, qui est un identifiant unique pour chaque document.vector
Le contenu du document : l'intégration du document.text
: le contenu textuel du document.metadata
les métadonnées du document.
Ensuite, nous construisons un index AUTOINDEX
sur le champ vector
. Puis nous créons la collection.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Strong",
)
milvus_client.load_collection(collection_name=collection_name)
Charger des données à partir d'Unstructured
Unstructured fournit un pipeline d'ingestion flexible et puissant pour traiter différents types de fichiers, y compris PDF, HTML, etc. Nous utiliserons la fonctionnalité d'ingestion pour partitionner les fichiers PDF dans un répertoire local. Nous chargerons ensuite les données dans Milvus.
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig,
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
directory_with_pdfs = "./pdf_files"
directory_with_results = "./pdf_processed_outputs"
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_concurrency_level": 15,
},
),
uploader_config=LocalUploaderConfig(output_dir=directory_with_results),
).run()
from unstructured.staging.base import elements_from_json
def load_processed_files(directory_path):
elements = []
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
try:
elements.extend(elements_from_json(filename=file_path))
except IOError:
print(f"Error: Could not read file {filename}.")
return elements
elements = load_processed_files(directory_with_results)
Insérer des données dans Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
Récupérer et générer une réponse
Définir une fonction pour récupérer les documents pertinents dans Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Définir une fonction pour générer une réponse à l'aide des documents récupérés dans le pipeline RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Testons le pipeline RAG avec un exemple de question.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
INFO: HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.