RAG mit Milvus und DeepSeek aufbauen
DeepSeek ermöglicht Entwicklern die Erstellung und Skalierung von KI-Anwendungen mit leistungsstarken Sprachmodellen. Es bietet effiziente Inferenz, flexible APIs und fortschrittliche Mixture-of-Experts (MoE)-Architekturen für robuste Schlussfolgerungen und Retrieval-Aufgaben.
In diesem Tutorial zeigen wir Ihnen, wie Sie eine Retrieval-Augmented Generation (RAG)-Pipeline mit Milvus und DeepSeek erstellen.
Vorbereitung
Abhängigkeiten und Umgebung
! pip install --upgrade pymilvus[model] openai requests tqdm
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Laufzeitumgebung neu starten, um die gerade installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Laufzeit" am oberen Bildschirmrand und wählen Sie "Sitzung neu starten" aus dem Dropdown-Menü).
DeepSeek aktiviert die OpenAI-ähnliche API. Sie können sich auf der offiziellen Website anmelden und den API-Schlüssel DEEPSEEK_API_KEY
als Umgebungsvariable vorbereiten.
import os
os.environ["DEEPSEEK_API_KEY"] = "***********"
Bereiten Sie die Daten vor
Wir verwenden die FAQ-Seiten aus der Milvus-Dokumentation 2.4.x als privates Wissen in unserem RAG, was eine gute Datenquelle für eine einfache RAG-Pipeline ist.
Laden Sie die Zip-Datei herunter und entpacken Sie die Dokumente in den Ordner milvus_docs
.
! wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
! unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Wir laden alle Markdown-Dateien aus dem Ordner milvus_docs/en/faq
. Für jedes Dokument verwenden wir einfach "# ", um den Inhalt in der Datei zu trennen, wodurch der Inhalt jedes Hauptteils der Markdown-Datei grob getrennt werden kann.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Vorbereiten des LLM und des Einbettungsmodells
DeepSeek ermöglicht die OpenAI-ähnliche API, und Sie können dieselbe API mit kleinen Anpassungen verwenden, um den LLM aufzurufen.
from openai import OpenAI
deepseek_client = OpenAI(
api_key=os.environ["DEEPSEEK_API_KEY"],
base_url="https://api.deepseek.com",
)
Definieren Sie ein Einbettungsmodell, um Texteinbettungen unter Verwendung von milvus_model
zu generieren. Wir verwenden das Modell DefaultEmbeddingFunction
als Beispiel, das ein vortrainiertes und leichtgewichtiges Einbettungsmodell ist.
from pymilvus import model as milvus_model
embedding_model = milvus_model.DefaultEmbeddingFunction()
Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[-0.04836066 0.07163023 -0.01130064 -0.03789345 -0.03320649 -0.01318448
-0.03041712 -0.02269499 -0.02317863 -0.00426028]
Laden Sie Daten in Milvus
Erstellen Sie die Sammlung
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Wie für das Argument von
MilvusClient
:
- Die Einstellung von
uri
als lokale Datei, z. B../milvus.db
, ist die bequemste Methode, da Milvus Lite automatisch alle Daten in dieser Datei speichert.- Wenn Sie große Datenmengen haben, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Server-Uri, z. B.
http://localhost:19530
, alsuri
.- Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Service für Milvus, verwenden möchten, passen Sie
uri
undtoken
an, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.
Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, wenn dies der Fall ist.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Erstellen Sie eine neue Sammlung mit den angegebenen Parametern.
Wenn wir keine Feldinformationen angeben, erstellt Milvus automatisch ein Standardfeld id
für den Primärschlüssel und ein Feld vector
zum Speichern der Vektordaten. Ein reserviertes JSON-Feld wird verwendet, um nicht schema-definierte Felder und ihre Werte zu speichern.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Daten einfügen
Iterieren Sie durch die Textzeilen, erstellen Sie Einbettungen und fügen Sie dann die Daten in Milvus ein.
Hier ist ein neues Feld text
, das ein nicht definiertes Feld im Sammelschema ist. Es wird automatisch dem reservierten dynamischen JSON-Feld hinzugefügt, das auf hoher Ebene wie ein normales Feld behandelt werden kann.
from tqdm import tqdm
data = []
doc_embeddings = embedding_model.encode_documents(text_lines)
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 0%| | 0/72 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
- Avoid using `tokenizers` before the fork if possible
- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 246522.36it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
RAG erstellen
Abrufen von Daten für eine Abfrage
Lassen Sie uns eine häufige Frage über Milvus angeben.
question = "How is data stored in milvus?"
Suchen Sie nach der Frage in der Sammlung und rufen Sie die semantischen Top-3-Treffer ab.
search_res = milvus_client.search(
collection_name=collection_name,
data=embedding_model.encode_queries(
[question]
), # Convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Werfen wir einen Blick auf die Suchergebnisse der Abfrage
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.6572665572166443
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.6312146186828613
],
[
"How does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###",
0.6115777492523193
]
]
LLM verwenden, um eine RAG-Antwort zu erhalten
Konvertieren Sie die abgerufenen Dokumente in ein String-Format.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definieren Sie System- und Benutzer-Prompts für das Lanage Model. Diese Eingabeaufforderung wird mit den abgerufenen Dokumenten aus Milvus zusammengestellt.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Verwenden Sie das von DeepSeek bereitgestellte deepseek-chat
Modell, um eine Antwort auf der Grundlage der Prompts zu generieren.
response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
1. **Inserted Data**: This includes vector data, scalar data, and collection-specific schema. The inserted data is stored in persistent storage as incremental logs. Milvus supports various object storage backends for this purpose, such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
2. **Metadata**: Metadata is generated within Milvus and is specific to each Milvus module. This metadata is stored in etcd, a distributed key-value store.
Additionally, when data is inserted, it is first loaded into a message queue, and Milvus returns success at this stage. The data is then written to persistent storage as incremental logs by the data node. If the `flush()` function is called, the data node is forced to write all data in the message queue to persistent storage immediately.
Großartig! Wir haben erfolgreich eine RAG-Pipeline mit Milvus und DeepSeek aufgebaut.