Costruire RAG con Milvus e DeepSeek
DeepSeek consente agli sviluppatori di creare e scalare applicazioni di intelligenza artificiale con modelli linguistici ad alte prestazioni. Offre un'inferenza efficiente, API flessibili e architetture avanzate di Mixture-of-Experts (MoE) per compiti di ragionamento e recupero robusti.
In questo tutorial vi mostreremo come costruire una pipeline Retrieval-Augmented Generation (RAG) utilizzando Milvus e DeepSeek.
Preparazione
Dipendenze e ambiente
! pip install --upgrade pymilvus[model] openai requests tqdm
Se si utilizza Google Colab, per abilitare le dipendenze appena installate potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Restart session" dal menu a discesa).
DeepSeek abilita l'API in stile OpenAI. È possibile accedere al suo sito web ufficiale e preparare la chiave api DEEPSEEK_API_KEY
come variabile d'ambiente.
import os
os.environ["DEEPSEEK_API_KEY"] = "***********"
Preparare i dati
Come conoscenza privata nel nostro RAG utilizziamo le pagine FAQ della documentazione di Milvus 2.4.x, che è una buona fonte di dati per una semplice pipeline RAG.
Scaricare il file zip ed estrarre i documenti nella cartella milvus_docs
.
! wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
! unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Carichiamo tutti i file markdown dalla cartella milvus_docs/en/faq
. Per ogni documento, usiamo semplicemente "# " per separare il contenuto del file, che può separare approssimativamente il contenuto di ogni parte principale del file markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparare il modello LLM e l'embedding
DeepSeek consente di utilizzare l'API in stile OpenAI, che può essere utilizzata con piccoli aggiustamenti per chiamare l'LLM.
from openai import OpenAI
deepseek_client = OpenAI(
api_key=os.environ["DEEPSEEK_API_KEY"],
base_url="https://api.deepseek.com",
)
Definire un modello di embedding per generare embeddings di testo utilizzando milvus_model
. Utilizziamo come esempio il modello DefaultEmbeddingFunction
, che è un modello di embedding pre-addestrato e leggero.
from pymilvus import model as milvus_model
embedding_model = milvus_model.DefaultEmbeddingFunction()
Generare un embedding di prova e stamparne la dimensione e i primi elementi.
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[-0.04836066 0.07163023 -0.01130064 -0.03789345 -0.03320649 -0.01318448
-0.03041712 -0.02269499 -0.02317863 -0.00426028]
Caricare i dati in Milvus
Creare la collezione
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Come per l'argomento di
MilvusClient
:
- L'impostazione di
uri
come file locale, ad esempio./milvus.db
, è il metodo più conveniente, poiché utilizza automaticamente Milvus Lite per memorizzare tutti i dati in questo file.- Se si dispone di una grande quantità di dati, è possibile configurare un server Milvus più performante su docker o kubernetes. In questa configurazione, utilizzare l'uri del server, ad esempio
http://localhost:19530
, comeuri
.- Se si desidera utilizzare Zilliz Cloud, il servizio cloud completamente gestito per Milvus, regolare
uri
etoken
, che corrispondono all'endpoint pubblico e alla chiave Api di Zilliz Cloud.
Verificare se la raccolta esiste già e, in caso affermativo, eliminarla.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Creare una nuova raccolta con i parametri specificati.
Se non si specifica alcun campo, Milvus creerà automaticamente un campo predefinito id
per la chiave primaria e un campo vector
per memorizzare i dati vettoriali. Un campo JSON riservato viene utilizzato per memorizzare campi non definiti da schemi e i loro valori.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Inserire i dati
Si intersecano le righe di testo, si creano le incorporazioni e si inseriscono i dati in Milvus.
Ecco un nuovo campo text
, che è un campo non definito nello schema della collezione. Verrà aggiunto automaticamente al campo dinamico JSON riservato, che può essere trattato come un campo normale ad alto livello.
from tqdm import tqdm
data = []
doc_embeddings = embedding_model.encode_documents(text_lines)
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 0%| | 0/72 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
- Avoid using `tokenizers` before the fork if possible
- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 246522.36it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Costruire la RAG
Recuperare i dati per una query
Specifichiamo una domanda frequente su Milvus.
question = "How is data stored in milvus?"
Cerchiamo la domanda nella raccolta e recuperiamo le prime 3 corrispondenze semantiche.
search_res = milvus_client.search(
collection_name=collection_name,
data=embedding_model.encode_queries(
[question]
), # Convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Diamo un'occhiata ai risultati della ricerca della domanda
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.6572665572166443
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.6312146186828613
],
[
"How does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###",
0.6115777492523193
]
]
Utilizzare LLM per ottenere una risposta RAG
Convertire i documenti recuperati in un formato stringa.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definire i prompt del sistema e dell'utente per il Lanage Model. Questo prompt viene assemblato con i documenti recuperati da Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilizzare il modello deepseek-chat
fornito da DeepSeek per generare una risposta basata sui prompt.
response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
1. **Inserted Data**: This includes vector data, scalar data, and collection-specific schema. The inserted data is stored in persistent storage as incremental logs. Milvus supports various object storage backends for this purpose, such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
2. **Metadata**: Metadata is generated within Milvus and is specific to each Milvus module. This metadata is stored in etcd, a distributed key-value store.
Additionally, when data is inserted, it is first loaded into a message queue, and Milvus returns success at this stage. The data is then written to persistent storage as incremental logs by the data node. If the `flush()` function is called, the data node is forced to write all data in the message queue to persistent storage immediately.
Ottimo! Abbiamo costruito con successo una pipeline RAG con Milvus e DeepSeek.