Construir RAG con Milvus y SiliconFlow
SiliconFlow se ha comprometido a construir una plataforma de infraestructura de IA escalable, estandarizada y de alto rendimiento. SiliconCloud es una de las ofertas insignia de SiliconFlow, descrita como una plataforma de modelo como servicio (MaaS). Proporciona un entorno completo para desplegar varios modelos de IA, incluidos modelos de lenguaje de gran tamaño (LLM) y modelos de incrustación. SiliconCloud agrega numerosos modelos de código abierto, lo que permite a los usuarios acceder fácilmente a estos recursos y utilizarlos sin necesidad de configurar una gran infraestructura.
En este tutorial, le mostraremos cómo construir un pipeline RAG (Retrieval-Augmented Generation) con Milvus y SiliconFlow.
Preparación
Dependencias y entorno
$ pip install --upgrade pymilvus openai requests tqdm
Si estás utilizando Google Colab, para habilitar las dependencias que acabas de instalar, es posible que tengas que reiniciar el tiempo de ejecución (haz clic en el menú "Tiempo de ejecución" en la parte superior de la pantalla, y selecciona "Reiniciar sesión" en el menú desplegable).
SiliconFlow habilita la API de estilo OpenAI. Puedes acceder a su web oficial y preparar la clave api SILICON_FLOW_API_KEY
como variable de entorno.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
Preparar los datos
Utilizamos las páginas de preguntas frecuentes de la Documentación de Milvus 2.4.x como conocimiento privado en nuestra RAG, que es una buena fuente de datos para una canalización RAG sencilla.
Descargamos el archivo zip y extraemos los documentos a la carpeta milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Cargamos todos los archivos markdown de la carpeta milvus_docs/en/faq
. Para cada documento, simplemente utilizamos "# " para separar el contenido en el archivo, lo que puede separar aproximadamente el contenido de cada parte principal del archivo markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparar el modelo de incrustación
Inicializamos un cliente para preparar el modelo de incrustación. SiliconFlow habilita la API de estilo OpenAI, y puedes utilizar la misma API con pequeños ajustes para llamar al modelo de incrustación y al LLM.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
Definir una función para generar incrustaciones de texto utilizando el cliente. Utilizamos el modelo BAAI/bge-large-en-v1.5
como ejemplo.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
Genere una incrustación de prueba e imprima su dimensión y sus primeros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Cargar los datos en Milvus
Crear la colección
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Como para el argumento de MilvusClient
:
- Establecer el
uri
como un archivo local, por ejemplo./milvus.db
, es el método más conveniente, ya que utiliza automáticamente Milvus Lite para almacenar todos los datos en este archivo. - Si tiene una gran escala de datos, puede configurar un servidor Milvus más eficiente en docker o kubernetes. En esta configuración, por favor utilice la uri del servidor, por ejemplo
http://localhost:19530
, como suuri
. - Si desea utilizar Zilliz Cloud, el servicio en la nube totalmente gestionado para Milvus, ajuste
uri
ytoken
, que corresponden al punto final público y a la clave Api en Zilliz Cloud.
Compruebe si la colección ya existe y elimínela en caso afirmativo.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Crear una nueva colección con los parámetros especificados.
Si no especificamos ninguna información de campo, Milvus creará automáticamente un campo id
por defecto para la clave primaria, y un campo vector
para almacenar los datos vectoriales. Se utiliza un campo JSON reservado para almacenar campos no definidos por el esquema y sus valores.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Insertar datos
Iterar a través de las líneas de texto, crear incrustaciones, y luego insertar los datos en Milvus.
Aquí hay un nuevo campo text
, que es un campo no definido en el esquema de la colección. Se añadirá automáticamente al campo dinámico JSON reservado, que puede tratarse como un campo normal a alto nivel.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Construir RAG
Recuperar datos para una consulta
Especifiquemos una pregunta frecuente sobre Milvus.
question = "How is data stored in milvus?"
Busquemos la pregunta en la colección y recuperemos las 3 primeras coincidencias semánticas.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Veamos los resultados de la consulta
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
Utilizar LLM para obtener una respuesta RAG
Convertir los documentos recuperados en un formato de cadena.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definir avisos de sistema y de usuario para el modelo de lenguaje. Este prompt se ensambla con los documentos recuperados de Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilice el modelo deepseek-ai/DeepSeek-V2.5
proporcionado por SiliconCloud para generar una respuesta basada en las instrucciones.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
Muy bien. Hemos construido con éxito una tubería RAG con Milvus y SiliconFlow.