Construir RAG con Milvus y Fireworks AI
Fireworks AI es una plataforma de inferencia de IA generativa que ofrece una velocidad y una preparación para la producción líderes en el sector para ejecutar y personalizar modelos. Fireworks AI proporciona una variedad de servicios de IA generativa, incluidos modelos sin servidor, despliegues bajo demanda y capacidades de ajuste fino. Ofrece un entorno completo para desplegar varios modelos de IA, incluidos modelos de lenguaje de gran tamaño (LLM) y modelos de incrustación. Fireworks AI agrega numerosos modelos, lo que permite a los usuarios acceder fácilmente a estos recursos y utilizarlos sin necesidad de una amplia configuración de la infraestructura.
En este tutorial, le mostraremos cómo construir una canalización RAG (Retrieval-Augmented Generation) con Milvus y Fireworks AI.
Preparación
Dependencias y entorno
$ pip install --upgrade pymilvus openai requests tqdm
Si estás utilizando Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla, y selecciona "Reiniciar sesión" en el menú desplegable).
Fireworks AI habilita la API al estilo OpenAI. Puedes acceder a su web oficial y preparar la clave api FIREWORKS_API_KEY
como variable de entorno.
import os
os.environ["FIREWORKS_API_KEY"] = "***********"
Preparar los datos
Utilizamos las páginas de preguntas frecuentes de la Documentación de Milvus 2.4.x como conocimiento privado en nuestra RAG, que es una buena fuente de datos para una canalización RAG sencilla.
Descargamos el archivo zip y extraemos los documentos a la carpeta milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Cargamos todos los archivos markdown de la carpeta milvus_docs/en/faq
. Para cada documento, simplemente utilizamos "# " para separar el contenido en el archivo, lo que puede separar aproximadamente el contenido de cada parte principal del archivo markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparar el LLM y el modelo de incrustación
Inicializamos un cliente para preparar el LLM y el modelo de incrustación. Fireworks AI habilita la API de estilo OpenAI, y puedes utilizar la misma API con pequeños ajustes para llamar al modelo de incrustación y al LLM.
from openai import OpenAI
fireworks_client = OpenAI(
api_key=os.environ["FIREWORKS_API_KEY"],
base_url="https://api.fireworks.ai/inference/v1",
)
Define una función para generar incrustaciones de texto utilizando el cliente. Usamos el modelo nomic-ai/nomic-embed-text-v1.5
como ejemplo.
def emb_text(text):
return (
fireworks_client.embeddings.create(
input=text, model="nomic-ai/nomic-embed-text-v1.5"
)
.data[0]
.embedding
)
Genere una incrustación de prueba e imprima su dimensión y sus primeros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]
Cargar los datos en Milvus
Crear la colección
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Como para el argumento de MilvusClient
:
- Establecer el
uri
como un archivo local, por ejemplo./milvus.db
, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo. - Si tiene una gran escala de datos, puede configurar un servidor Milvus más eficiente en docker o kubernetes. En esta configuración, por favor utilice la uri del servidor, por ejemplo
http://localhost:19530
, como suuri
. - Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste
uri
ytoken
, que corresponden al punto final público y a la clave Api en Zilliz Cloud.
Compruebe si la colección ya existe y elimínela en caso afirmativo.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Crear una nueva colección con los parámetros especificados.
Si no especificamos ninguna información de campo, Milvus creará automáticamente un campo id
por defecto para la clave primaria, y un campo vector
para almacenar los datos vectoriales. Se utiliza un campo JSON reservado para almacenar campos no definidos por el esquema y sus valores.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Insertar datos
Iterar a través de las líneas de texto, crear incrustaciones, y luego insertar los datos en Milvus.
Aquí hay un nuevo campo text
, que es un campo no definido en el esquema de la colección. Se añadirá automáticamente al campo dinámico JSON reservado, que puede tratarse como un campo normal a alto nivel.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00, 2.51it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Construir RAG
Recuperar datos para una consulta
Especifiquemos una pregunta frecuente sobre Milvus.
question = "How is data stored in milvus?"
Busquemos la pregunta en la colección y recuperemos las 3 primeras coincidencias semánticas.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Veamos los resultados de la consulta
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8334928750991821
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.746377170085907
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7328270673751831
]
]
Utilizar LLM para obtener una respuesta RAG
Convertir los documentos recuperados en un formato de cadena.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definir avisos de sistema y de usuario para el modelo de lenguaje. Este prompt se ensambla con los documentos recuperados de Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilice el modelo llama-v3p1-405b-instruct
proporcionado por Fireworks para generar una respuesta basada en las instrucciones.
response = fireworks_client.chat.completions.create(
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:
1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.
Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.
¡Genial! Hemos construido con éxito una tubería RAG con Milvus y Fireworks AI.