Criar um RAG com Milvus e Unstructured
O Unstructured fornece uma plataforma e ferramentas para ingerir e processar documentos não estruturados para Retrieval Augmented Generation (RAG) e afinação de modelos. Ele oferece uma plataforma de interface do usuário sem código e serviços de API sem servidor, permitindo que os usuários processem dados em recursos de computação hospedados pelo Unstructured.
Neste tutorial, usaremos o Unstructured para ingerir documentos PDF e, em seguida, usaremos o Milvus para criar um pipeline RAG.
Preparação
Dependências e ambiente
$ pip install -qU "unstructured[pdf]" pymilvus milvus-lite openai
Opções de instalação:
- Para processar todos os formatos de documento:
pip install "unstructured[all-docs]" - Para formatos específicos (por exemplo, PDF):
pip install "unstructured[pdf]" - Para obter mais opções de instalação, consulte a documentação do Unstructured
Se estiver a utilizar o Google Colab, para ativar as dependências acabadas de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
Neste exemplo, vamos utilizar o OpenAI como LLM. Deve preparar a chave api OPENAI_API_KEY como uma variável de ambiente.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
Preparar os clientes Milvus e OpenAI
Pode utilizar o cliente Milvus para criar uma coleção Milvus e inserir dados na mesma.
from pymilvus import MilvusClient, DataType
# Initialize Milvus client
milvus_client = MilvusClient(uri="./milvus_demo.db")
Quanto ao argumento de MilvusClient:
- Definir o
uricomo um ficheiro local, por exemplo,./milvus.db, é o método mais conveniente, uma vez que utiliza automaticamente o Milvus Lite para armazenar todos os dados neste ficheiro. - Se tiver uma grande escala de dados, digamos mais de um milhão de vectores, pode configurar um servidor Milvus mais eficiente em Docker ou Kubernetes. Nesta configuração, use o endereço e a porta do servidor como seu uri, por exemplo,
http://localhost:19530. Se ativar a funcionalidade de autenticação no Milvus, utilize ": " como token, caso contrário não defina o token. - Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste
urietoken, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud.
Verifique se a coleção já existe e elimine-a se existir.
collection_name = "my_rag_collection"
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Prepare um cliente OpenAI para gerar embeddings e gerar respostas.
from openai import OpenAI
openai_client = OpenAI()
def emb_text(text):
return (
openai_client.embeddings.create(input=text, model="text-embedding-3-small")
.data[0]
.embedding
)
Gerar um embedding de teste e imprimir a sua dimensão e os primeiros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]
Criar coleção Milvus
Vamos criar uma coleção com o seguinte esquema:
id: a chave primária, que é um identificador único para cada documento.vectora incorporação do documento.textO conteúdo do texto do documento.metadata: os metadados do documento.
Em seguida, criamos um índice AUTOINDEX no campo vector. E depois criamos a coleção.
# Create schema
schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="AUTOINDEX",
)
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params,
consistency_level="Bounded",
)
milvus_client.load_collection(collection_name=collection_name)
Carregar dados do Unstructured
O Unstructured fornece um pipeline de ingestão flexível e poderoso para processar vários tipos de arquivo, incluindo PDF, HTML e muito mais. Vamos particionar e dividir um arquivo PDF local. E, em seguida, carregaremos os dados no Milvus.
import warnings
from unstructured.partition.auto import partition
warnings.filterwarnings("ignore")
elements = partition(
filename="./pdf_files/WhatisMilvus.pdf",
strategy="hi_res",
chunking_strategy="by_title",
) # Replace with the path to your PDF file
Vamos examinar os elementos particionados do ficheiro PDF. Cada elemento representa um pedaço de conteúdo extraído pelo processo de particionamento do Unstructured.
for element in elements:
print(element)
break
What is Milvus?
Milvus is a high-performance, highly scalable vector database that runs efficiently across a wide range of environments, from a laptop to large-scale distributed systems. It is available as both open-source software and a cloud service.
Inserir dados no Milvus.
data = []
for i, element in enumerate(elements):
data.append(
{
"id": i,
"vector": emb_text(element.text),
"text": element.text,
"metadata": element.metadata.to_dict(),
}
)
milvus_client.insert(collection_name=collection_name, data=data)
{'insert_count': 29, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28], 'cost': 0}
Recuperar e gerar resposta
Defina uma função para obter documentos relevantes do Milvus.
def retrieve_documents(question, top_k=3):
search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)],
limit=top_k,
# search_params={"metric_type": "IP", "params": {}},
output_fields=["text"],
)
return [(res["entity"]["text"], res["distance"]) for res in search_res[0]]
Defina uma função para gerar uma resposta utilizando os documentos recuperados no pipeline RAG.
def generate_rag_response(question):
retrieved_docs = retrieve_documents(question)
context = "\n".join([f"Text: {doc[0]}\n" for doc in retrieved_docs])
system_prompt = (
"You are an AI assistant. Provide answers based on the given context."
)
user_prompt = f"""
Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.
Context:
{context}
Question: {question}
"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content
Vamos testar o pipeline RAG com uma pergunta de exemplo.
question = "What is the Advanced Search Algorithms in Milvus?"
answer = generate_rag_response(question)
print(f"Question: {question}")
print(f"Answer: {answer}")
Question: What is the Advanced Search Algorithms in Milvus?
Answer: The Advanced Search Algorithms in Milvus include a wide range of in-memory and on-disk indexing/search algorithms such as IVF, HNSW, and DiskANN. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.