Construire un RAG avec Milvus et Unstructured
Unstructured fournit une plateforme et des outils permettant d'ingérer et de traiter des documents non structurés pour la génération augmentée par récupération (RAG) et l'affinement du modèle. Il offre à la fois une plateforme d'interface utilisateur sans code et des services API sans serveur, permettant aux utilisateurs de traiter les données sur les ressources de calcul hébergées par Unstructured.
Dans ce tutoriel, nous utiliserons Unstructured pour ingérer des documents PDF, puis Milvus pour construire un pipeline RAG.
Préparation
Dépendances et environnement
$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai
Options d'installation :
- Pour le traitement de tous les formats de documents :
pip install "unstructured[all-docs]" - Pour des formats spécifiques (par exemple, PDF) :
pip install "unstructured[pdf]" - Pour plus d'options d'installation, voir la documentation Unstructured.
Si vous utilisez Google Colab, pour activer les dépendances qui viennent d'être installées, vous devrez peut-être redémarrer le runtime (cliquez sur le menu "Runtime" en haut de l'écran, et sélectionnez "Restart session" dans le menu déroulant).
Nous utiliserons OpenAI comme LLM dans cet exemple. Vous devez préparer la clé api OPENAI_API_KEY en tant que variable d'environnement.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Préparer les clients Milvus et OpenAI
Vous pouvez utiliser le client Milvus pour créer une collection Milvus et y insérer des données.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")
Comme pour l'argument de MilvusClient:
- Définir
uricomme un fichier local, par exemple./milvus.db, est la méthode la plus pratique, car elle utilise automatiquement Milvus Lite pour stocker toutes les données dans ce fichier. - Si vous avez des données à grande échelle, par exemple plus d'un million de vecteurs, vous pouvez configurer un serveur Milvus plus performant sur Docker ou Kubernetes. Dans cette configuration, veuillez utiliser l'adresse et le port du serveur comme uri, par exemple
http://localhost:19530. Si vous activez la fonction d'authentification sur Milvus, utilisez ": " comme jeton, sinon ne définissez pas le jeton. - Si vous souhaitez utiliser Zilliz Cloud, le service en nuage entièrement géré pour Milvus, ajustez les valeurs
uriettoken, qui correspondent au point de terminaison public et à la clé Api dans Zilliz Cloud.
Vérifier si la collection existe déjà et la supprimer si c'est le cas.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Préparez un client OpenAI pour générer des embeddings et des réponses.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Générer un embedding de test et imprimer sa dimension et ses premiers éléments.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Création de la collection Milvus
Nous allons créer une collection avec le schéma suivant :
idla clé primaire, qui est un identifiant unique pour chaque document.vectorLe contenu du document : l'intégration du document.text: le contenu textuel du document.metadatales métadonnées du document.
Ensuite, nous construisons un index AUTOINDEX sur le champ vector. Puis nous créons la collection.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Bounded",
)
milvus_client.load_collection(collection_name=collection_name)
Charger des données à partir d'Unstructured
Unstructured fournit un pipeline d'ingestion flexible et puissant pour traiter différents types de fichiers, y compris PDF, HTML, etc. Nous allons partitionner et diviser en morceaux un fichier PDF local. Puis nous chargerons les données dans Milvus.
import warnings
from unstructured.partition.auto import partition
warnings.filterwarnings("ignore")
elements = partition(
filename="./pdf_files/WhatisMilvus.pdf",
strategy="hi_res",
chunking_strategy="by_title",
) # Replace with the path to your PDF file
Examinons les éléments partitionnés du fichier PDF. Chaque élément représente un morceau de contenu extrait par le processus de partitionnement d'Unstructured.
for element in elements:
print(element)
break
What is Milvus?
Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.
Insérer les données dans Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}
Récupérer et générer une réponse
Définir une fonction pour récupérer les documents pertinents dans Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Définir une fonction pour générer une réponse à l'aide des documents récupérés dans le pipeline RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Testons le pipeline RAG avec un exemple de question.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.