Creación de una canalización RAG: Carga de datos de S3 en Milvus
Este tutorial le guiará a través del proceso de construcción de un canal de Generación de Recuperación-Aumentada (RAG) utilizando Milvus y Amazon S3. Aprenderá a cargar documentos de forma eficiente desde un cubo de S3, dividirlos en trozos manejables y almacenar sus incrustaciones vectoriales en Milvus para una recuperación rápida y escalable. Para agilizar este proceso, utilizaremos LangChain como herramienta para cargar datos desde S3 y facilitar su almacenamiento en Milvus.
Preparación
Dependencias y entorno
$ pip install --upgrade --quiet pymilvus milvus-lite openai requests tqdm boto3 langchain langchain-core langchain-community langchain-text-splitters langchain-milvus langchain-openai bs4
Si estás utilizando Google Colab, para habilitar las dependencias que acabamos de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla, y selecciona "Reiniciar sesión" en el menú desplegable).
En este ejemplo utilizaremos OpenAI como LLM. Debes preparar la clave api OPENAI_API_KEY como variable de entorno.
import os
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
Configuración de S3
Para cargar documentos desde S3, necesitas lo siguiente:
- AWS Access Key y Secret Key: Almacénalas como variables de entorno para acceder de forma segura a tu bucket de S3:
os.environ["AWS_ACCESS_KEY_ID"] = "your-aws-access-key-id"
os.environ["AWS_SECRET_ACCESS_KEY"] = "your-aws-secret-access-key"
- S3 Bucket y Document: Especifique el nombre del bucket y el nombre del documento como argumentos de la clase
S3FileLoader.
from langchain_community.document_loaders import S3FileLoader
loader = S3FileLoader(
bucket="milvus-s3-example", # Replace with your S3 bucket name
key="WhatIsMilvus.docx", # Replace with your document file name
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)
- Cargar documentos: Una vez configurado, puede cargar el documento desde S3 en su canalización:
documents = loader.load()
Este paso garantiza que sus documentos se carguen correctamente desde S3 y estén listos para su procesamiento en la tubería RAG.
Dividir documentos en trozos
Después de cargar el documento, utilice RecursiveCharacterTextSplitter de LangChain para dividir el contenido en trozos manejables:
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Initialize a RecursiveCharacterTextSplitter for splitting text into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Split the documents into chunks using the text_splitter
docs = text_splitter.split_documents(documents)
# Let's take a look at the first document
docs[1]
Document(metadata={'source': 's3://milvus-s3-example/WhatIsMilvus.docx'}, page_content='Milvus offers three deployment modes, covering a wide range of data scales—from local prototyping in Jupyter Notebooks to massive Kubernetes clusters managing tens of billions of vectors: \n\nMilvus Lite is a Python library that can be easily integrated into your applications. As a lightweight version of Milvus, it’s ideal for quick prototyping in Jupyter Notebooks or running on edge devices with limited resources. Learn more.\nMilvus Standalone is a single-machine server deployment, with all components bundled into a single Docker image for convenient deployment. Learn more.\nMilvus Distributed can be deployed on Kubernetes clusters, featuring a cloud-native architecture designed for billion-scale or even larger scenarios. This architecture ensures redundancy in critical components. Learn more. \n\nWhat Makes Milvus so Fast\U0010fc00 \n\nMilvus was designed from day one to be a highly efficient vector database system. In most cases, Milvus outperforms other vector databases by 2-5x (see the VectorDBBench results). This high performance is the result of several key design decisions: \n\nHardware-aware Optimization: To accommodate Milvus in various hardware environments, we have optimized its performance specifically for many hardware architectures and platforms, including AVX512, SIMD, GPUs, and NVMe SSD. \n\nAdvanced Search Algorithms: Milvus supports a wide range of in-memory and on-disk indexing/search algorithms, including IVF, HNSW, DiskANN, and more, all of which have been deeply optimized. Compared to popular implementations like FAISS and HNSWLib, Milvus delivers 30%-70% better performance.')
En esta etapa, sus documentos se cargan desde S3, se dividen en trozos más pequeños y están listos para su posterior procesamiento en la tubería Retrieval-Augmented Generation (RAG).
Construir la cadena RAG con Milvus Vector Store
Inicializaremos un almacén vectorial Milvus con los documentos, que cargarán los documentos en el almacén vectorial Milvus y construirán un índice bajo el capó.
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Milvus.from_documents(
documents=docs,
embedding=embeddings,
connection_args={
"uri": "./milvus_demo.db",
},
drop_old=False, # Drop the old Milvus collection if it exists
)
Para el connection_args:
Establecer el
uricomo un archivo local, por ejemplo./milvus.db, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo.Si tiene una gran escala de datos, puede configurar un servidor Milvus más eficiente en docker o kubernetes. En esta configuración, por favor utilice la uri del servidor, por ejemplo
http://localhost:19530, como suuri.Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, por favor ajuste los
uriytoken, que corresponden al Public Endpoint y a la Api key en Zilliz Cloud.
Busque los documentos en el almacén vectorial de Milvus utilizando una pregunta de consulta de prueba. Echemos un vistazo al primer documento.
query = "How can Milvus be deployed"
vectorstore.similarity_search(query, k=1)
[Document(metadata={'pk': 455631712233193487, 'source': 's3://milvus-s3-example/WhatIsMilvus.docx'}, page_content='Milvus offers three deployment modes, covering a wide range of data scales—from local prototyping in Jupyter Notebooks to massive Kubernetes clusters managing tens of billions of vectors: \n\nMilvus Lite is a Python library that can be easily integrated into your applications. As a lightweight version of Milvus, it’s ideal for quick prototyping in Jupyter Notebooks or running on edge devices with limited resources. Learn more.\nMilvus Standalone is a single-machine server deployment, with all components bundled into a single Docker image for convenient deployment. Learn more.\nMilvus Distributed can be deployed on Kubernetes clusters, featuring a cloud-native architecture designed for billion-scale or even larger scenarios. This architecture ensures redundancy in critical components. Learn more. \n\nWhat Makes Milvus so Fast\U0010fc00 \n\nMilvus was designed from day one to be a highly efficient vector database system. In most cases, Milvus outperforms other vector databases by 2-5x (see the VectorDBBench results). This high performance is the result of several key design decisions: \n\nHardware-aware Optimization: To accommodate Milvus in various hardware environments, we have optimized its performance specifically for many hardware architectures and platforms, including AVX512, SIMD, GPUs, and NVMe SSD. \n\nAdvanced Search Algorithms: Milvus supports a wide range of in-memory and on-disk indexing/search algorithms, including IVF, HNSW, DiskANN, and more, all of which have been deeply optimized. Compared to popular implementations like FAISS and HNSWLib, Milvus delivers 30%-70% better performance.')]
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# Initialize the OpenAI language model for response generation
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
# Define the prompt template for generating AI responses
PROMPT_TEMPLATE = """
Human: You are an AI assistant, and provides answers to questions by using fact based and statistical information when possible.
Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
<context>
{context}
</context>
<question>
{question}
</question>
The response should be specific and use statistics or numbers when possible.
Assistant:"""
# Create a PromptTemplate instance with the defined template and input variables
prompt = PromptTemplate(
template=PROMPT_TEMPLATE, input_variables=["context", "question"]
)
# Convert the vector store to a retriever
retriever = vectorstore.as_retriever()
# Define a function to format the retrieved documents
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
Utiliza el LCEL(LangChain Expression Language) para construir una cadena RAG.
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
res = rag_chain.invoke(query)
res
'Milvus can be deployed in three different modes: Milvus Lite for local prototyping and edge devices, Milvus Standalone for single-machine server deployment, and Milvus Distributed for deployment on Kubernetes clusters. These deployment modes cover a wide range of data scales, from small-scale prototyping to massive clusters managing tens of billions of vectors.'