Costruire RAG con Milvus e SiliconFlow
SiliconFlow è impegnata nella realizzazione di una piattaforma AI Infra scalabile, standardizzata e ad alte prestazioni. SiliconCloud è una delle offerte di punta di SiliconFlow, descritta come una piattaforma Model as a Service (MaaS). Fornisce un ambiente completo per la distribuzione di vari modelli di IA, compresi i modelli linguistici di grandi dimensioni (LLM) e i modelli di embedding. SiliconCloud aggrega numerosi modelli open-source, consentendo agli utenti di accedere facilmente a queste risorse e di utilizzarle senza la necessità di un'ampia configurazione dell'infrastruttura.
In questo tutorial vi mostreremo come costruire una pipeline RAG (Retrieval-Augmented Generation) con Milvus e SiliconFlow.
Preparazione
Dipendenze e ambiente
$ pip install --upgrade pymilvus openai requests tqdm
Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).
SiliconFlow abilita l'API in stile OpenAI. È possibile accedere al sito web ufficiale e preparare la chiave api SILICON_FLOW_API_KEY
come variabile d'ambiente.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
Preparare i dati
Come conoscenza privata nel nostro RAG utilizziamo le pagine FAQ della Documentazione Milvus 2.4.x, che è una buona fonte di dati per una semplice pipeline RAG.
Scaricare il file zip ed estrarre i documenti nella cartella milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Carichiamo tutti i file markdown dalla cartella milvus_docs/en/faq
. Per ogni documento, usiamo semplicemente "# " per separare il contenuto del file, che può separare approssimativamente il contenuto di ogni parte principale del file markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparare il modello di incorporamento
Inizializziamo un client per preparare il modello di embedding. SiliconFlow abilita l'API in stile OpenAI e si può usare la stessa API, con piccoli aggiustamenti, per chiamare il modello di embedding e l'LLM.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
Definire una funzione per generare embeddings di testo utilizzando il client. Utilizziamo il modello BAAI/bge-large-en-v1.5
come esempio.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
Generare un embedding di prova e stamparne la dimensione e i primi elementi.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Caricare i dati in Milvus
Creare la raccolta
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Come per l'argomento di MilvusClient
:
- L'impostazione di
uri
come file locale, ad esempio./milvus.db
, è il metodo più conveniente, poiché utilizza automaticamente Milvus Lite per memorizzare tutti i dati in questo file. - Se si dispone di una grande quantità di dati, è possibile configurare un server Milvus più performante su docker o kubernetes. In questa configurazione, utilizzare l'uri del server, ad esempio
http://localhost:19530
, comeuri
. - Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito per Milvus, regolare
uri
etoken
, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud.
Verificare se la raccolta esiste già e, in caso affermativo, eliminarla.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Creare una nuova raccolta con i parametri specificati.
Se non si specifica alcun campo, Milvus creerà automaticamente un campo predefinito id
per la chiave primaria e un campo vector
per memorizzare i dati vettoriali. Un campo JSON riservato viene utilizzato per memorizzare campi non definiti dalla mappa e i loro valori.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Inserire i dati
Si intersecano le righe di testo, si creano le incorporazioni e si inseriscono i dati in Milvus.
Ecco un nuovo campo text
, che è un campo non definito nello schema della collezione. Verrà aggiunto automaticamente al campo dinamico JSON riservato, che può essere trattato come un campo normale ad alto livello.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Costruire la RAG
Recuperare i dati per una query
Specifichiamo una domanda frequente su Milvus.
question = "How is data stored in milvus?"
Cerchiamo la domanda nella raccolta e recuperiamo le prime tre corrispondenze semantiche.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Diamo un'occhiata ai risultati della ricerca della domanda
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
Utilizzare LLM per ottenere una risposta RAG
Convertire i documenti recuperati in un formato stringa.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definire i prompt del sistema e dell'utente per il Lanage Model. Questo prompt viene assemblato con i documenti recuperati da Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilizzare il modello deepseek-ai/DeepSeek-V2.5
fornito da SiliconCloud per generare una risposta basata sui prompt.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
Ottimo! Abbiamo costruito con successo una pipeline RAG con Milvus e SiliconFlow.